一个轻量级、高性能的PHP队列处理包,支持Redis驱动,提供完整的任务队列解决方案。
- 🚀 高性能Redis队列驱动
- 📦 开箱即用,零配置启动
- 🔧 灵活的配置系统
- ✨ 灵活的配置系统
- 📊 完整的任务监控和状态管理
- 🔄 任务重试和失败处理
- 📝 详细的日志记录
- 🎯 支持多队列和优先级
- 🛡️ 内存和时间限制保护
- 🔌 易于集成到现有项目
composer require yng/phpqueue
- 克隆仓库到你的项目目录:
git clone https://github.com/yng666/phpqueue.git
- 在你的项目中引入自动加载文件:
require_once 'vendor/autoload.php';
// 或者如果没有使用composer
require_once 'path/to/phpqueue/src/autoload.php';
use Yng\PhpQueue\QueuePackage;
use Yng\PhpQueue\Jobs\Job;
// 使用默认配置初始化
QueuePackage::initWithDefaults();
// 或者指定数据目录
QueuePackage::initWithDefaults('/path/to/your/data');
// 或者从环境变量初始化
QueuePackage::initFromEnv();
use Yng\PhpQueue\Jobs\Job;
class SendEmailJob extends Job
{
protected $email;
protected $subject;
protected $content;
public function __construct($email, $subject, $content)
{
$this->email = $email;
$this->subject = $subject;
$this->content = $content;
}
public function handle()
{
// 发送邮件的逻辑
mail($this->email, $this->subject, $this->content);
$this->log('info', "Email sent to {$this->email}");
}
public function failed(\Exception $exception)
{
$this->log('error', "Failed to send email to {$this->email}: " . $exception->getMessage());
}
}
// 获取队列管理器
$queueManager = QueuePackage::getQueueManager();
$queue = $queueManager->connection();
// 创建并推送任务
$job = new SendEmailJob('user@example.com', 'Hello', 'This is a test email');
$queue->push($job);
// 推送到指定队列
$queue->push($job, 'emails');
// 延迟执行(60秒后)
$queue->later(60, $job);
// 获取工作进程
$worker = QueuePackage::getWorker();
// 处理默认队列的任务
$worker->work('default');
// 处理指定队列的任务
$worker->work('emails');
// 只处理一个任务后退出
$worker->work('default', ['once' => true]);
包提供了便捷的命令行工具:
# 处理默认队列
php vendor/bin/phpqueue work
# 或者
php bin/phpqueue work
# 处理指定队列
php vendor/bin/phpqueue work --queue=emails
# 只处理一个任务
php vendor/bin/phpqueue work --once
# 设置内存限制
php vendor/bin/phpqueue work --memory=256
# 设置超时时间
php vendor/bin/phpqueue work --timeout=120
php vendor/bin/phpqueue status
# 或者
php bin/phpqueue status
php vendor/bin/phpqueue restart
# 或者
php bin/phpqueue restart
创建 .env
文件或设置环境变量:
# Redis配置
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_QUEUE_DB=0
REDIS_QUEUE_PREFIX=queue:
# 队列配置
QUEUE_CONNECTION=redis
QUEUE_DATA_PATH=/path/to/data
QUEUE_LOG_PATH=/path/to/logs
# 工作进程配置
QUEUE_WORKER_SLEEP=3
QUEUE_WORKER_TIMEOUT=60
QUEUE_WORKER_MEMORY=128
QUEUE_WORKER_MAX_JOBS=1000
// config/queue.php
return [
'default' => 'redis',
'connections' => [
'redis' => [
'driver' => 'redis',
'host' => '127.0.0.1',
'port' => 6379,
'password' => null,
'database' => 0,
'prefix' => 'queue:',
],
],
'worker' => [
'sleep' => 3,
'timeout' => 60,
'memory' => 128,
'max_jobs' => 1000,
],
'logging' => [
'enabled' => true,
'level' => 'info',
'path' => '/path/to/logs/queue.log',
'max_files' => 30, // 最大保留日志文件数
'max_size' => '10M', // 单个日志文件最大大小
],
];
// 使用自定义配置初始化
QueuePackage::init([], '/path/to/config/queue.php');
use Yng\PhpQueue\Jobs\Job;
class RetryableJob extends Job
{
public $tries = 3; // 最大重试次数
public $timeout = 120; // 超时时间(秒)
public function handle()
{
// 可能失败的操作
if (rand(1, 10) > 7) {
throw new \Exception('Random failure');
}
$this->log('info', 'Job completed successfully');
}
public function retryUntil()
{
return now()->addMinutes(10); // 10分钟内可以重试
}
}
// 批量推送任务
$jobs = [
new SendEmailJob('user1@example.com', 'Hello', 'Message 1'),
new SendEmailJob('user2@example.com', 'Hello', 'Message 2'),
new SendEmailJob('user3@example.com', 'Hello', 'Message 3'),
];
foreach ($jobs as $job) {
$queue->push($job, 'emails');
}
// 高优先级任务
$urgentJob = new SendEmailJob('admin@example.com', 'Urgent', 'Important message');
$queue->push($urgentJob, 'high-priority');
// 工作进程按优先级处理
$worker->work(['high-priority', 'default', 'low-priority']);
// 失败的任务会记录在日志文件中
// 默认位置:{data_path}/log/queue_failed.log
// 可以通过命令行查看
php vendor/bin/phpqueue status --failed
// 或者
php bin/phpqueue status --failed
use Yng\PhpQueue\Jobs\Job;
class LoggingJob extends Job
{
public function handle()
{
$this->log('info', 'Job started');
// 执行任务逻辑
$this->log('info', 'Job completed');
}
}
PhpQueue 支持自动日志轮转功能,防止日志文件过大:
// config/queue.php
'logging' => [
'enabled' => true,
'level' => 'info',
'path' => '/path/to/logs/queue.log',
'max_files' => 30, // 最大保留日志文件数(默认30个)
'max_size' => '10M', // 单个日志文件最大大小(默认10MB)
],
queue_worker.log
- 工作进程运行日志queue_failed.log
- 失败任务日志queue_error.log
- 系统错误日志
当日志文件达到 max_size
设定的大小时,系统会自动进行轮转:
- 当前日志文件重命名为
.1.log
- 历史日志文件依次重命名(
.1.log
→.2.log
,.2.log
→.3.log
,以此类推) - 超过
max_files
数量的最旧日志文件会被删除 - 创建新的日志文件继续记录
K
或k
- KB(千字节)M
或m
- MB(兆字节)G
或g
- GB(吉字节)- 纯数字 - 字节数
示例:
1024
- 1024字节500K
- 500KB10M
- 10MB1G
- 1GB
如果需要手动清理日志文件:
# 清理所有日志文件
rm -f /path/to/logs/*.log*
# 只清理轮转的历史日志文件
rm -f /path/to/logs/*.*.log
- PHP >= 7.4
- Redis 扩展
- JSON 扩展
- Redis 服务器 >= 3.0
MIT License
欢迎提交 Issue 和 Pull Request!
- GitHub 仓库:https://github.com/yng666/phpqueue
- 提交 Issue:https://github.com/yng666/phpqueue/issues
- 提交 Pull Request:https://github.com/yng666/phpqueue/pulls
- 新增日志轮转功能,防止日志文件过大
- 支持配置最大日志文件数量和单文件大小限制
- 自动清理超过限制的历史日志文件
- 支持多种文件大小格式(K/M/G)
- 优化日志记录性能
- 统一命名空间为 Yng\PhpQueue
- 更新包名为 yng/phpqueue
- 优化代码结构和文档
- 完善 GitHub 链接和贡献指南
- 性能优化和 bug 修复
- 初始版本
- 基本队列功能
- Redis 驱动支持
- 命令行工具
- 多语言支持(中文/英文)