一个为 ThinkPHP 8 提供异步任务队列能力的组件,支持 PHP 8.0+。
内置驱动:
sync—— 同步执行(默认,调试用)database—— 数据库驱动redis—— Redis 驱动- 自定义驱动(传入完整类名即可)
composer require watsonhaw/think-queue安装完成后会自动在 config/queue.php 生成配置文件。
配置文件位于 config/queue.php,主要结构如下:
return [
// 默认连接
'default' => 'sync',
// 各连接配置
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
// 失败任务表配置
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];推荐使用 app\job 作为任务类的命名空间,也可放在任意可自动加载的目录。
任务类无需继承任何类,只需约定:
| 方法 | 说明 |
|---|---|
fire(Job $job, mixed $data) / 任意自定义方法名 |
任务执行入口,接受当前任务对象和自定义数据 |
failed(mixed $data) |
(可选)任务达到最大重试次数后调用 |
namespace app\job;
use think\queue\Job;
class SendEmail
{
public function fire(Job $job, mixed $data): void
{
// 执行具体任务,例如发送邮件
// ...
// 检查当前重试次数
if ($job->attempts() > 3) {
// 已重试 3 次仍未成功
$job->delete();
return;
}
// 执行成功后手动删除任务(否则会重复执行)
$job->delete();
// 或重新发布(延迟执行)
// $job->release(60);
}
public function failed(mixed $data): void
{
// 任务达到最大重试次数后的逻辑
}
}namespace app\job;
use think\queue\Job;
class Notification
{
public function sendEmail(Job $job, mixed $data): void
{
// ...
}
public function sendSms(Job $job, mixed $data): void
{
// ...
}
public function failed(mixed $data): void
{
// ...
}
}任务类可以使用 think\queue\Queueable trait,在发布任务时进行链式配置:
use think\queue\ShouldQueue;
use think\queue\Queueable;
class SendEmail implements ShouldQueue
{
use Queueable;
public function handle(): void
{
// 任务逻辑
}
}发布时:
use think\facade\Queue;
(new SendEmail())
->onConnection('redis') // 指定连接
->onQueue('high') // 指定队列
->delay(30) // 延迟 30 秒
->dispatch(); // 发布到队列
// 或直接使用门面:
Queue::push(SendEmail::class);
Queue::later(60, SendEmail::class, ['key' => 'value']);通过门面类 \think\facade\Queue 发布任务:
use think\facade\Queue;
// 立即发布(使用默认连接与队列)
Queue::push('app\job\SendEmail', ['to' => 'user@example.com']);
// 指定队列
Queue::push('app\job\SendEmail', ['to' => 'user@example.com'], 'high');
// 延迟发布($delay 秒后执行)
Queue::later(300, 'app\job\SendEmail', ['to' => 'user@example.com']);
// 多任务类时,使用 @method 语法
Queue::push('app\job\Notification@sendSms', ['to' => '13800138000']);push 与 later 的返回值:
sync驱动:null(同步执行完成)database驱动:任务 ID(int)redis驱动:true(成功时)
# 使用默认连接
php think queue:work
# 指定连接和队列
php think queue:work redis --queue=high
# 仅执行一次后退出
php think queue:work --once
# 配置参数
php think queue:work redis --queue=default --delay=0 --memory=128 --sleep=3 --tries=0参数说明:
| 参数 | 默认值 | 说明 |
|---|---|---|
connection |
config('queue.default') |
使用的队列连接名称 |
--queue |
default |
监听的队列名,多个用逗号分隔 |
--once |
— | 仅处理一个任务后退出 |
--delay |
0 |
任务失败后重新入队的延迟秒数 |
--memory |
128 |
内存限制(MB),超出后进程终止 |
--sleep |
3 |
队列空时的休眠秒数 |
--tries |
0 |
任务最大重试次数,0 表示无限重试 |
php think queue:listen
php think queue:listen redis --queue=high --delay=0 --memory=128 --sleep=3 --tries=0生产环境推荐配合 supervisor 或 systemd 保证进程常驻。
使用 database 驱动前需要创建 jobs 数据表:
php think queue:table
php think migrate:run如果需要记录失败任务:
php think queue:failed-table
php think migrate:run| 命令 | 作用 |
|---|---|
php think queue:failed |
列出所有失败的任务 |
php think queue:retry <id> |
重新发布指定 ID 的失败任务 |
php think queue:retry all |
重新发布所有失败任务 |
php think queue:forget <id> |
删除指定 ID 的失败任务 |
php think queue:flush |
清空所有失败任务 |
组件在任务生命周期会触发以下事件,可在事件订阅者中监听:
| 事件类 | 触发时机 | 公开属性(均为 readonly) |
|---|---|---|
think\queue\event\JobProcessing |
任务开始执行前 | $connection, $job |
think\queue\event\JobProcessed |
任务执行完成后 | $connection, $job |
think\queue\event\JobExceptionOccurred |
任务执行过程中出现异常 | $connection, $job, $exception |
think\queue\event\JobFailed |
任务失败(超过最大重试次数) | $connection, $job, $exception |
think\queue\event\WorkerStopping |
Worker 进程停止前 | $status |
示例:
use think\queue\event\JobFailed;
$this->app->event->listen(JobFailed::class, function (JobFailed $event): void {
// 通过 public readonly 属性直接访问
$connection = $event->connection;
$exception = $event->exception;
$payload = $event->job->payload();
// 日志、告警等
});| 项 | 版本 |
|---|---|
| ThinkPHP | ^8.0 |
| PHP | >= 8.0 |
| 可选扩展 | redis(使用 redis 驱动时)、pcntl(信号处理,可选) |