diff --git a/src/websocket-server/src/Listener/WorkerEndSubscriber.php b/src/websocket-server/src/Listener/WorkerEndSubscriber.php index d93c98ae0..dba350803 100644 --- a/src/websocket-server/src/Listener/WorkerEndSubscriber.php +++ b/src/websocket-server/src/Listener/WorkerEndSubscriber.php @@ -2,6 +2,7 @@ namespace Swoft\WebSocket\Server\Listener; +use Swoft\Config\Annotation\Mapping\Config; use Swoft\Event\Annotation\Mapping\Subscriber; use Swoft\Event\EventInterface; use Swoft\Event\EventSubscriberInterface; @@ -18,6 +19,12 @@ */ class WorkerEndSubscriber implements EventSubscriberInterface { + /** + * @Config("websocket.autoCloseOnWorkerEnd") + * @var int + */ + private $autoCloseConnection = 0; + /** * Configure events and corresponding processing methods (you can configure the priority) * @@ -39,6 +46,11 @@ public static function getSubscribedEvents(): array */ public function onWorkerStop(EventInterface $event): void { + // If not enable + if ($this->autoCloseConnection === 0) { + return; + } + /** @var WebSocketServer $server */ $server = $event->getTarget(); diff --git a/src/websocket-server/src/Message/Response.php b/src/websocket-server/src/Message/Response.php index 6709f1ad4..15a37ef90 100644 --- a/src/websocket-server/src/Message/Response.php +++ b/src/websocket-server/src/Message/Response.php @@ -5,6 +5,7 @@ use Swoft\Bean\Annotation\Mapping\Bean; use Swoft\Context\Context; use Swoft\Exception\SwoftException; +use Swoft\Log\Helper\CLog; use Swoft\Server\Concern\CommonProtocolDataTrait; use Swoft\Session\Session; use Swoft\WebSocket\Server\Connection; @@ -96,9 +97,11 @@ public static function new(int $sender = -1): self { $self = bean(self::class); - // Set properties - $self->sent = false; - $self->sender = $sender; + // Use sender as default receiver + $self->fd = $sender; + $self->sent = false; + $self->sender = $sender; + $self->sendToAll = false; return $self; @@ -190,11 +193,22 @@ public function send(Connection $conn = null): int return $server->sendToAll($content, $this->sender, $pageSize, $this->opcode); } - // To some users + // To special users if ($this->fds) { return $server->sendToSome($content, $this->fds, $this->noFds, $this->sender, $pageSize, $this->opcode); } + // Except some users + if ($this->noFds) { + return $server->sendToSome($content, [], $this->noFds, $this->sender, $pageSize, $this->opcode); + } + + // No receiver + if ($this->fd < 1) { + CLog::warning('no receiver for the response message'); + return 0; + } + // To one user $ok = $server->sendTo($this->fd, $content, $this->sender, $this->opcode, $this->finish); @@ -246,7 +260,7 @@ public function getFd(): int * * @return self */ - public function setFd(int $fd): ResponseInterface + public function setFd(int $fd): self { $this->fd = $fd; return $this; @@ -265,7 +279,7 @@ public function getOpcode(): int * * @return self */ - public function setOpcode(int $opcode): ResponseInterface + public function setOpcode(int $opcode): self { if ($opcode > 0 && $opcode < 11) { $this->opcode = $opcode; @@ -287,7 +301,7 @@ public function isFinish(): bool * * @return self */ - public function setFinish(bool $finish): ResponseInterface + public function setFinish(bool $finish): self { $this->finish = $finish; return $this; @@ -312,6 +326,15 @@ public function setSender(int $sender): ResponseInterface return $this; } + /** + * @return Response|self + */ + public function noSender(): self + { + $this->sender = 0; + return $this; + } + /** * @return string */ @@ -322,6 +345,7 @@ public function getContent(): string /** * @param string $content + * * @return Response|ResponseInterface */ public function setContent(string $content): ResponseInterface diff --git a/src/websocket-server/src/Swoole/HandshakeListener.php b/src/websocket-server/src/Swoole/HandshakeListener.php index 045c1c642..73f228de8 100644 --- a/src/websocket-server/src/Swoole/HandshakeListener.php +++ b/src/websocket-server/src/Swoole/HandshakeListener.php @@ -2,11 +2,9 @@ namespace Swoft\WebSocket\Server\Swoole; -use ReflectionException; use Swoft; use Swoft\Bean\Annotation\Mapping\Bean; use Swoft\Bean\BeanFactory; -use Swoft\Bean\Exception\ContainerException; use Swoft\Co; use Swoft\Context\Context; use Swoft\Http\Message\Request as Psr7Request; @@ -42,8 +40,6 @@ class HandshakeListener implements HandshakeInterface * @param Response $response * * @return bool - * @throws ReflectionException - * @throws ContainerException * @throws Throwable */ public function onHandshake(Request $request, Response $response): bool diff --git a/src/websocket-server/src/Swoole/MessageListener.php b/src/websocket-server/src/Swoole/MessageListener.php index 3df73244f..04d9145a7 100644 --- a/src/websocket-server/src/Swoole/MessageListener.php +++ b/src/websocket-server/src/Swoole/MessageListener.php @@ -58,7 +58,7 @@ public function onMessage(Server $server, Frame $frame): void try { // Trigger message before event - Swoft::trigger(WsServerEvent::MESSAGE_BEFORE, $fd, $server, $frame); + Swoft::trigger(WsServerEvent::MESSAGE_RECEIVE, $fd, $server, $frame); // Parse and dispatch message $dispatcher->dispatch($server, $request, $response); diff --git a/src/websocket-server/src/WebSocketServer.php b/src/websocket-server/src/WebSocketServer.php index fee7c0bc4..8636f3c11 100644 --- a/src/websocket-server/src/WebSocketServer.php +++ b/src/websocket-server/src/WebSocketServer.php @@ -119,7 +119,7 @@ public function sendTo( } $fromUser = $sender < 1 ? 'SYSTEM' : $sender; - $this->log("(private)The #{$fromUser} send message to the user #{$receiver}. Data: {$data}"); + $this->log("(private)The #{$fromUser} send message to the user #{$receiver}. Opcode: $opcode Data: {$data}"); return $this->swooleServer->push($receiver, $data, $opcode, $finish); } @@ -223,7 +223,7 @@ public function broadcast( public function sendToAll($data, int $sender = 0, int $pageSize = 50, int $opcode = WEBSOCKET_OPCODE_TEXT): int { $fromUser = $sender < 1 ? 'SYSTEM' : $sender; - $this->log("(broadcast)The #{$fromUser} send a message to all users. Data: {$data}"); + $this->log("(broadcast)The #{$fromUser} send a message to all users. Opcode: $opcode Data: {$data}"); return $this->pageEach(function (int $fd) use ($data, $opcode) { $this->swooleServer->push($fd, $data, $opcode); @@ -253,7 +253,7 @@ public function sendToSome( // To receivers if ($receivers) { - $this->log("(broadcast)The #{$fromUser} gave some specified user sending a message. Data: {$data}"); + $this->log("(broadcast)The #{$fromUser} gave some specified user sending a message. Opcode: $opcode Data: {$data}"); foreach ($receivers as $fd) { if ($this->swooleServer->isEstablished($fd)) { @@ -268,7 +268,7 @@ public function sendToSome( // To special users $excluded = $excluded ? (array)array_flip($excluded) : []; - $this->log("(broadcast)The #{$fromUser} send the message to everyone except some people. Data: {$data}"); + $this->log("(broadcast)The #{$fromUser} send the message to everyone except some people. Opcode: $opcode Data: {$data}"); return $this->pageEach(function (int $fd) use ($excluded, $data, $opcode) { if (isset($excluded[$fd])) { diff --git a/src/websocket-server/src/WsServerEvent.php b/src/websocket-server/src/WsServerEvent.php index d9c9976c1..ef091e9c1 100644 --- a/src/websocket-server/src/WsServerEvent.php +++ b/src/websocket-server/src/WsServerEvent.php @@ -18,10 +18,19 @@ final class WsServerEvent public const OPEN_AFTER = 'swoft.ws.server.open.after'; public const OPEN_ERROR = 'swoft.ws.server.open.error'; - // On before handle message - public const MESSAGE_BEFORE = 'swoft.ws.server.message.before'; - - // On message send + /** + * @deprecated Please use MESSAGE_RECEIVE instead. + */ + public const MESSAGE_BEFORE = 'swoft.ws.server.message.receive'; + + /** + * On message receive, before handle message + */ + public const MESSAGE_RECEIVE = 'swoft.ws.server.message.receive'; + + /** + * On before send message + */ public const MESSAGE_SEND = 'swoft.ws.server.message.send'; // On after handle message diff --git a/src/websocket-server/test/testing/MockHttpRequest.php b/src/websocket-server/test/testing/MockHttpRequest.php index edbfcdb75..6aeb08d73 100644 --- a/src/websocket-server/test/testing/MockHttpRequest.php +++ b/src/websocket-server/test/testing/MockHttpRequest.php @@ -80,7 +80,7 @@ class MockHttpRequest extends Request * * @return self */ - public static function new(array $server, array $headers, array $cookies, array $params): self + public static function new(array $server = [], array $headers = [], array $cookies = [], array $params = []): self { $instance = new self; diff --git a/src/websocket-server/test/unit/WsMessageDispatcherTest.php b/src/websocket-server/test/unit/WsMessageDispatcherTest.php index 14cf2a4cb..4a2bc5315 100644 --- a/src/websocket-server/test/unit/WsMessageDispatcherTest.php +++ b/src/websocket-server/test/unit/WsMessageDispatcherTest.php @@ -3,6 +3,7 @@ namespace SwoftTest\WebSocket\Server\Unit; use PHPUnit\Framework\TestCase; +use Swoft\WebSocket\Server\WsMessageDispatcher; /** * Class WsMessageDispatcherTest @@ -13,6 +14,7 @@ class WsMessageDispatcherTest extends TestCase */ public function testBasic(): void { + /** @var WsMessageDispatcher $wmd */ $wmd = bean('wsMsgDispatcher'); $this->assertNotEmpty($wmd); } diff --git a/src/websocket-server/test/unit/WsServerTestCase.php b/src/websocket-server/test/unit/WsServerTestCase.php index acc025954..c86f01874 100644 --- a/src/websocket-server/test/unit/WsServerTestCase.php +++ b/src/websocket-server/test/unit/WsServerTestCase.php @@ -3,6 +3,12 @@ namespace SwoftTest\WebSocket\Server\Unit; use PHPUnit\Framework\TestCase; +use Swoft\Http\Message\Request; +use Swoft\Http\Message\Response; +use Swoft\Session\Session; +use Swoft\WebSocket\Server\Connection; +use SwoftTest\WebSocket\Server\Testing\MockHttpRequest; +use SwoftTest\WebSocket\Server\Testing\MockHttpResponse; use SwoftTest\WebSocket\Server\Testing\MockWsServer; /** @@ -21,4 +27,17 @@ public function setUp(): void { $this->wsServer = new MockWsServer(); } + + public function addSession(int $fd): void + { + $req = MockHttpRequest::new(); + $res = MockHttpResponse::new(); + + $psrReq = Request::new($req); + $psrRes = Response::new($res); + + $conn = Connection::new($fd, $psrReq, $psrRes); + + Session::set((string)$fd, $conn); + } }