统一消息层 | WebSocket / SSE / MQTT / UDP / Long-Polling / CoAP / NATS / STOMP / gRPC / WebTransport / RTMP | PHP 8.2+ | PSR 合规 | 可插拔适配器
kode/messaging 是 kode/* 家族中的统一消息层 Composer 包,封装 WebSocket、SSE、MQTT、UDP、Long-Polling、CoAP、NATS、STOMP、gRPC、WebTransport、RTMP 等 11 种长连接 / 实时消息协议,提供一致的 API、协议无关的消息抽象和可插拔的扩展点。
一个 Messaging::server() 启动所有协议,业务代码面向接口编程,不感知具体协议。
| 特性 | 说明 |
|---|---|
| 🌐 11 协议统一 | WebSocket / SSE / MQTT / UDP / Long-Polling / CoAP / NATS / STOMP / gRPC / WebTransport / RTMP |
| 🔌 可插拔适配器 | 新增协议不改动核心代码(3 步:Adapter / 注册 / 文档) |
| 📡 协议无关 | 业务层只依赖 MessageInterface |
| 🧩 中间件管道 | 鉴权 / 限流 / 编解码 / 校验 / 追踪 |
| 🏷️ 路由 | 基于 event / topic 路由消息 |
| 📢 统一 Pub/Sub | 进程内 / 跨进程 / 跨节点(Redis) |
| 🛡️ PSR 合规 | PSR-3 / PSR-4 / PSR-7 / PSR-14 / PSR-18 |
| ⚡ 协程友好 | 与 kode/fibers 协作,无 Fiber 自动降级 |
| 🔒 安全 | TLS、JWT、Origin 校验、签名鉴权、限流 |
| 📊 可观测 | 事件、指标、结构化日志 |
| 🚀 高性能 | Swoole / Swow / stream 多传输层 |
| 🔁 重连 / 心跳 | 客户端内置指数退避重连、ping/pong |
composer require kode/messaging按需安装可选依赖:
composer require kode/fibers # 协程
composer require kode/event # 事件派发
composer require kode/process # 多 Worker / 集群
composer require kode/queue # MQTT QoS 落地、延迟消息
composer require kode/jwt # JWT 鉴权
composer require nyholm/psr7 # SSE 的 PSR-7 基础<?php
require __DIR__ . '/vendor/autoload.php';
use Kode\Messaging\Messaging;
Messaging::server('ws://0.0.0.0:8080')
->on('connection.open', fn($c) => $c->send('welcome'))
->on('message.received', fn($c, $m) => $c->send("echo: {$m->payload()}"))
->start();Messaging::server('sse://0.0.0.0:8081')
->interval(1000)
->on('interval', fn($c) => $c->send(['event' => 'tick', 'data' => time()]))
->start();Messaging::client('mqtt://broker.example.com:1883')
->withClientId('device-001')
->subscribe('sensors/+/temperature', function ($topic, $payload) {
echo "[$topic] $payload\n";
})
->connect()
->loop();$client = Messaging::client('nats://broker:4222');
$client->subscribe('orders.*', function ($subject, $payload) {
echo "[$subject] $payload\n";
});
$client->connect();
$client->publish('orders.created', json_encode(['id' => 1]));$client = Messaging::client('stomp://broker:61613');
$client->subscribe('/queue/orders', function ($data) {
echo $data['body'] . "\n";
});
$client->connect();
$client->send('/queue/orders', 'hello');$client = Messaging::client('grpc://api.example.com:50051');
$response = $client->call('/helloworld.Greeter/SayHello', $reqPayload);$client = Messaging::client('webtransport://example.com:4433');
$conn = $client->connect();
$conn->sendBidirectional('hello');
$conn->sendDatagram('ping', reliable: false);Messaging::server('rtmp://0.0.0.0:1935')
->on('message.received', fn($c, $m) => log_rtmp($m))
->start();| 协议 | 方案 | 服务端 | 客户端 | 适用 |
|---|---|---|---|---|
| WebSocket | ws:// / wss:// |
✅ | ✅ | 浏览器长连接、聊天、游戏 |
| SSE | sse:// |
✅ | ✅ | 服务端推送、通知、大屏 |
| MQTT 3.1.1 / 5.0 | mqtt:// / mqtts:// |
实验性 Broker | ✅ | IoT、移动推送、Pub/Sub |
| UDP / Datagram | udp:// |
✅ | ✅ | 实时音视频、游戏、广播 |
| Long-Polling | poll:// / http:// |
✅ | ✅ | WebSocket 回退、低频推送 |
| CoAP (RFC 7252) | coap:// / coaps:// |
✅ | ✅ | IoT 传感器、NB-IoT、LoRa |
| NATS | nats:// |
✅ 嵌入式 Broker | ✅ | 微服务 Pub/Sub、request/reply |
| STOMP 1.2 | stomp:// |
✅ 嵌入式 Broker | ✅ | 消息队列(兼容 RabbitMQ / ActiveMQ) |
| gRPC Streaming | grpc:// |
✅ | ✅ | 微服务 RPC、4 种流式调用 |
| WebTransport | wt:// / webtransport:// |
HTTP/3-fallback | ✅ | HTTP/3 双工(依赖 aioquic / msquic) |
| RTMP | rtmp:// / rtmps:// |
✅ | ✅ | 直播源接入(OBS / FMLE) |
┌────────────────────────────────────────────────────────┐
│ Layer 5 — 应用层 │
│ Messaging::server()->on('message.received') │
├────────────────────────────────────────────────────────┤
│ Layer 4 — 中间件管道(Auth → RateLimit → Codec → ...) │
├────────────────────────────────────────────────────────┤
│ Layer 3 — 协议适配器(WebSocket / SSE / MQTT / ...) │
├────────────────────────────────────────────────────────┤
│ Layer 2 — 消息抽象(MessageInterface / Connection) │
├────────────────────────────────────────────────────────┤
│ Layer 1 — 传输层(stream / sockets / swoole / swow) │
└────────────────────────────────────────────────────────┘
| 场景 | 依赖包 | 协作方式 |
|---|---|---|
| 日志 | kode/log 或 PSR-3 |
LoggerInterface 注入 |
| 协程 | kode/fibers |
长连接内启动 Fiber |
| 事件 | kode/event |
connection.open 等事件派发 |
| 上下文 | kode/context |
连接 ID、追踪 |
| 队列 | kode/queue |
MQTT QoS 1/2 落地 |
| 进程 | kode/process |
多 Worker、集群 |
| 缓存 | kode/cache |
会话、广播订阅 |
| HTTP | kode/http / nyholm/psr7 |
SSE 复用 HTTP |
| HTTP 客户端 | kode/http-client |
长轮询回退 |
| 鉴权 | kode/jwt |
JWT 鉴权中间件 |
- 最低:PHP 8.2
- 推荐:PHP 8.3 / 8.4
- 已验证:PHP 8.5
支持的现代特性:
| 特性 | 用法 | 版本 |
|---|---|---|
readonly class |
不可变消息体 | ≥ 8.2 |
enum |
协议状态机 | ≥ 8.1 |
Fibers |
协程 | ≥ 8.1 |
| typed class constants | 协议常量 | ≥ 8.3 |
#[\Override] |
覆盖标记 | ≥ 8.3 |
| property hooks | 连接属性 | ≥ 8.4 |
pipe operator |> |
链式构造 | ≥ 8.5 |
8.5 不可用时自动提供 Messaging::pipeline() 等价实现。
- docs/index.md — 文档总览
- docs/quick-start.md — 快速开始
- docs/architecture.md — 架构设计
- docs/websocket.md — WebSocket 协议指南
- docs/sse.md — SSE 协议指南
- docs/mqtt.md — MQTT 协议指南
- docs/udp.md — UDP 协议指南
- docs/long-polling.md — Long-Polling 协议指南
- docs/coap.md — CoAP 协议指南
- docs/nats.md — NATS 协议指南
- docs/stomp.md — STOMP 协议指南
- docs/grpc.md — gRPC Streaming 协议指南
- docs/webtransport.md — WebTransport 协议指南
- docs/rtmp.md — RTMP 协议指南
- docs/roadmap.md — 协议扩展路线图
- docs/pubsub.md — 发布订阅总线
- docs/middleware.md — 中间件
- docs/configuration.md — 配置
- docs/deployment.md — 部署
- docs/release.md — 发布流程
- docs/migration.md — 从其它框架迁移
- docs/examples/ — 完整示例
examples/websocket_server.php— WebSocket 服务端examples/websocket_client.php— WebSocket 客户端examples/sse_server.php— SSE 服务端examples/mqtt_publish.php— MQTT 发布examples/mqtt_subscribe.php— MQTT 订阅examples/udp_client.php— UDP 客户端examples/coap_server.php/coap_client.php— CoAP 服务端 / 客户端examples/nats_server.php/nats_client.php— NATS 服务端 / 客户端examples/stomp_server.php— STOMP 服务端examples/grpc_server.php— gRPC 服务端examples/longpolling_server.php/longpolling_client.php— Long-Pollingexamples/webtransport_server.php— WebTransportexamples/rtmp_server.php— RTMP 直播源接入docs/examples/chat.php— 聊天室docs/examples/push.php— 实时通知docs/examples/iot.php— IoT 设备docs/examples/rpc.php— RPC over WebSocket
Apache-2.0