AMQP library wrapper for php-amqplib,基于 php-amqplib 封装的 RabbitMQ 客户端组件。
composer require ssh/amqp或者手动安装:
cd /Users/wrkj/Workspace/Php/2025/ssh/rabbitmq
composer install在 webman 框架中创建配置文件 config/plugin/webman/amqp/amqp.php:
<?php
return [
'default' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'options' => [
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'heartbeat' => 0,
'max_reconnect_attempts' => 3, // 最大重连次数
'reconnect_delay' => 1, // 重连间隔(秒)
],
],
'cluster' => [
'host' => '192.168.1.100',
'port' => 5672,
'user' => 'admin',
'password' => 'admin123',
'vhost' => '/',
'options' => [
'max_reconnect_attempts' => 5,
'reconnect_delay' => 2,
],
],
];use ssh\Amqp\Client;
// 发送字符串消息(默认交换机,routing_key = 队列名)
Client::send('my_queue', 'Hello World!');
// 使用指定连接
Client::send('my_queue', 'Hello World!', 'consumer');
// 使用指定连接和配置
Client::send('my_queue', 'Hello World!', 'consumer', 'plugin.rabbitmq.rabbitmq');
// 发送到交换机
Client::send(
'my_queue', // 队列名
'Hello World!', // 消息内容
'consumer', // 连接名
'plugin.rabbitmq.rabbitmq', // 配置名(可选)
[], // 消息属性
'my_exchange', // 交换机名
'my_routing_key' // 路由键(可选,默认为队列名)
);
// 发送 JSON 数据
Client::send('my_queue', json_encode(['id' => 1, 'name' => 'test']));
// 带成功/失败回调
Client::send(
'my_queue',
'Hello World!',
'consumer',
'plugin.rabbitmq.rabbitmq',
[],
'',
'my_queue',
function($msg, $queue, $exchange, $routing_key) {
echo "发送成功!\n";
echo "消息: " . $msg->body . "\n";
},
function($e, $msg, $queue, $exchange, $routing_key) {
echo "发送失败:" . $e->getMessage() . "\n";
}
);
// 使用返回值判断
try {
$result = Client::send('my_queue', 'Hello World!');
if ($result) {
echo "发送成功!\n";
}
} catch (\Exception $e) {
echo "发送失败:" . $e->getMessage() . "\n";
}use ssh\Amqp\Client;
$client = Client::connection('consumer');
// 简单发送
$result = $client->publish('my_queue', '', 'my_routing_key', 'Hello AMQP!');
if ($result) {
echo "发送成功!\n";
}
// 发送消息到指定交换机
$client->publish('my_queue', 'my_exchange', 'my_routing_key', 'Hello AMQP!');
// 带回调的发送
$client->publish(
'my_queue',
'',
'my_routing_key',
'Hello AMQP!',
false,
false,
0,
function($msg, $queue, $exchange, $routing_key) {
echo "发送成功!\n";
},
function($e, $msg, $queue, $exchange, $routing_key) {
echo "发送失败:" . $e->getMessage() . "\n";
}
);use ssh\Amqp\Client;
use PhpAmqpLib\Message\AMQPMessage;
$msg = new AMQPMessage('Hello World!', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'priority' => 5,
]);
$client = Client::connection('default');
$client->publish('my_queue', 'my_exchange', 'my_routing_key', $msg);创建一个实现 Consumer 接口的类:
<?php
namespace app\amqp;
use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
class MyConsumer implements Consumer
{
// 连接名称,默认 'default'
public $connection = 'default';
// 队列名称
public $queue = 'my_queue';
// 是否自动确认,false 表示需要手动 ack
public $no_ack = false;
// 预取消息数量
public $prefetch_count = 1;
/**
* 消费消息
*
* @param string $data 消息内容
* @param array $properties 消息属性
* @param AMQPMessage $msg AMQP 消息对象
* @param Client $client AMQP 客户端实例
*/
public function consume($data, $properties, $msg, $client)
{
// 处理消息
echo "Received: {$data}\n";
// 手动确认消息
$client->ack($msg);
// 或者拒绝消息并重新入队
// $client->nack($msg, false, true);
}
}<?php
namespace app\amqp;
use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
class ExchangeConsumer implements Consumer
{
public $connection = 'default';
// 交换机名称
public $exchange = 'my_exchange';
// 交换机类型:direct, fanout, topic
public $exchange_type = 'direct';
public $exchange_durable = true;
// 队列名称
public $queue = 'my_queue';
public $queue_durable = true;
// 路由键
public $routing_key = 'my_routing_key';
// 消息确认模式
public $no_ack = false;
// 预取数量
public $prefetch_count = 10;
public function consume($data, $properties, $msg, $client)
{
echo "Received: {$data}\n";
// 处理业务逻辑
try {
// 处理成功,确认消息
$client->ack($msg);
} catch (\Exception $e) {
// 处理失败,拒绝消息并重新入队
$client->nack($msg, false, true);
}
}
}在 config/plugin/webman/amqp/process.php 中添加:
<?php
return [
// 消费者进程
ssh\Amqp\Exception\Process\Consumer::class => [
'consumer_dir' => base_path() . '/app/amqp', // 消费者类所在目录
],
];当消费者进程启动后,会执行以下步骤:
- 扫描消费者类:扫描指定目录下的所有 PHP 文件,查找实现了
ssh\Amqp\Consumer接口的类 - 设置消费者:为每个消费者类创建 AMQP 连接、声明交换机和队列、绑定关系
- 启动消息循环:进入无限循环,持续监听消息并调用相应的消费者处理
- 自动重连:如果连接断开,会自动尝试重连
- 异常处理:消息处理过程中的异常会被捕获并记录,同时根据配置决定是否重新入队
interface Consumer
{
/**
* 返回队列名称
*
* @return string
*/
public function queue();
/**
* 消费消息
*
* @param string $data 消息内容
* @param array $properties 消息属性
* @param AMQPMessage $msg AMQP 消息对象
* @param Client $client AMQP 客户端
*/
public function consume($data, $properties, $msg, $client);
}connection: 连接名称,默认 'default'exchange: 交换机名称(可选)exchange_type: 交换机类型,默认 'direct'exchange_durable: 交换机是否持久化,默认 truerouting_key: 路由键(可选)queue_durable: 队列是否持久化,默认 truequeue_exclusive: 队列是否独占,默认 falsequeue_auto_delete: 队列是否自动删除,默认 falsequeue_arguments: 队列额外参数no_ack: 是否自动确认,默认 falseprefetch_count: 预取消息数量,默认 1config: 配置名称(可选)
shouldRequeue(\Exception $e): 判断处理失败的消息是否应该重新入队,返回 true 表示重新入队,false 表示丢弃。默认返回 true。
use ssh\Amqp\Exception\Client;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$client = Client::connection('default');
// 声明交换机
$client->declareExchange(
'my_exchange',
AMQPExchangeType::DIRECT,
false, // passive
true, // durable
false // auto_delete
);
// 声明队列
$client->declareQueue(
'my_queue',
false, // passive
true, // durable
false, // exclusive
false, // nowait
null // arguments
);
// 绑定队列到交换机
$client->bindQueue(
'my_queue',
'my_exchange',
'my_routing_key'
);use ssh\Amqp\Exception\Client;
$client = Client::connection('default');
// 设置预取数量
$client->qos(0, 10);
// 开始消费
$client->consume('my_queue', '', false, false, false, false, function ($msg) {
echo "Received: {$msg->body}\n";
});
// 保持运行
while (true) {
$client->wait();
}消费者接收的 $properties 数组包含以下可能的字段:
content_type: 内容类型content_encoding: 内容编码delivery_mode: 投递模式priority: 优先级correlation_id: 关联 IDreply_to: 回复地址expiration: 过期时间message_id: 消息 IDtimestamp: 时间戳type: 消息类型user_id: 用户 IDapp_id: 应用 ID
组件提供了以下异常类:
ssh\Amqp\Exception\AmqpException: 所有 AMQP 异常的基类ssh\Amqp\Exception\ConnectionException: 连接相关的异常ssh\Amqp\Exception\ChannelException: 通道相关的异常ssh\Amqp\Exception\PublishException: 发布消息相关的异常ssh\Amqp\Exception\ConsumeException: 消费消息相关的异常ssh\Amqp\Exception\QueueException: 队列相关的异常ssh\Amqp\Exception\ExchangeException: 交换机相关的异常
use ssh\Amqp\Exception\Client;
use ssh\Amqp\Exception\Exception\ConnectionException;
use ssh\Amqp\Exception\Exception\PublishException;
try {
Client::send('my_queue', 'Hello World!');
} catch (ConnectionException $e) {
// 处理连接异常
echo "连接失败: " . $e->getMessage() . "\n";
// 可以尝试重试或降级处理
} catch (PublishException $e) {
// 处理发布异常
echo "发布消息失败: " . $e->getMessage() . "\n";
}消费者进程会自动处理消息处理过程中的异常,并记录日志。如果消费者实现了 shouldRequeue 方法,可以控制失败的消息是否重新入队:
<?php
namespace app\amqp;
use ssh\Amqp\Client;
use ssh\Amqp\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
class MyConsumer implements Consumer
{
public $connection = 'default';
public $queue = 'my_queue';
public $no_ack = false;
public $prefetch_count = 1;
/**
* 判断处理失败的消息是否应该重新入队
*
* @param \Exception $e
* @return bool
*/
public function shouldRequeue(\Exception $e)
{
// 对于数据库连接失败等临时问题,重新入队
if (strpos($e->getMessage(), 'connection') !== false) {
return true;
}
// 对于其他异常(如格式错误),不重新入队
return false;
}
public function consume($data, $properties, $msg, $client)
{
// 处理消息
$client->ack($msg);
}
}Client 类提供了以下方法来管理连接:
use ssh\Amqp\Exception\Client;
$client = Client::connection('default');
// 检查连接是否正常
if ($client->isConnected()) {
echo "连接正常\n";
}
// 手动重连
$client->reconnect();
// 关闭连接
$client->close();在消费者进程中可以配置日志记录器:
use ssh\Amqp\Process\Consumer;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
$logger = new Logger('amqp');
$logger->pushHandler(new StreamHandler('path/to/log/file.log', Logger::DEBUG));
$consumer = new Consumer(base_path() . '/app/amqp');
$consumer->setLogger($logger);Client::connection($name = 'default', $config = null): 获取连接实例Client::send($queue, $body, $connection = 'default', $config = null, $properties = [], $exchange = '', $routing_key = null, $onSuccess = null, $onError = null): 发送消息,返回 bool
publish($queue, $exchange, $routing_key, $msg, $mandatory, $immediate, $ticket, $onSuccess, $onError): 发布消息,返回 booldeclareQueue(...): 声明队列declareExchange(...): 声明交换机bindQueue(...): 绑定队列consume(...): 消费消息ack($delivery_tag, $multiple, $requeue): 确认消息nack($delivery_tag, $multiple, $requeue): 拒绝消息reject($delivery_tag, $requeue): 拒绝单条消息qos(...): 设置 QoSwait($timeout): 等待消息close(): 关闭连接isConnected(): 检查连接是否正常reconnect(): 重新连接getChannel(): 获取通道对象getConnection(): 获取连接对象
$onSuccess: 成功回调函数,参数为($msg, $queue, $exchange, $routing_key)$onError: 失败回调函数,参数为($e, $msg, $queue, $exchange, $routing_key)
- PHP >= 7.4
- php-amqplib/php-amqplib >= 3.5
MIT