Skip to content

Commit

Permalink
update some logic ofr ws server
Browse files Browse the repository at this point in the history
  • Loading branch information
inhere committed Aug 9, 2019
1 parent 8f7df59 commit 3a8890c
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 21 deletions.
12 changes: 12 additions & 0 deletions src/websocket-server/src/Listener/WorkerEndSubscriber.php
Expand Up @@ -2,6 +2,7 @@

namespace Swoft\WebSocket\Server\Listener;

use Swoft\Config\Annotation\Mapping\Config;
use Swoft\Event\Annotation\Mapping\Subscriber;
use Swoft\Event\EventInterface;
use Swoft\Event\EventSubscriberInterface;
Expand All @@ -18,6 +19,12 @@
*/
class WorkerEndSubscriber implements EventSubscriberInterface
{
/**
* @Config("websocket.autoCloseOnWorkerEnd")
* @var int
*/
private $autoCloseConnection = 0;

/**
* Configure events and corresponding processing methods (you can configure the priority)
*
Expand All @@ -39,6 +46,11 @@ public static function getSubscribedEvents(): array
*/
public function onWorkerStop(EventInterface $event): void
{
// If not enable
if ($this->autoCloseConnection === 0) {
return;
}

/** @var WebSocketServer $server */
$server = $event->getTarget();

Expand Down
38 changes: 31 additions & 7 deletions src/websocket-server/src/Message/Response.php
Expand Up @@ -5,6 +5,7 @@
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Context\Context;
use Swoft\Exception\SwoftException;
use Swoft\Log\Helper\CLog;
use Swoft\Server\Concern\CommonProtocolDataTrait;
use Swoft\Session\Session;
use Swoft\WebSocket\Server\Connection;
Expand Down Expand Up @@ -96,9 +97,11 @@ public static function new(int $sender = -1): self
{
$self = bean(self::class);

// Set properties
$self->sent = false;
$self->sender = $sender;
// Use sender as default receiver
$self->fd = $sender;
$self->sent = false;
$self->sender = $sender;

$self->sendToAll = false;

return $self;
Expand Down Expand Up @@ -190,11 +193,22 @@ public function send(Connection $conn = null): int
return $server->sendToAll($content, $this->sender, $pageSize, $this->opcode);
}

// To some users
// To special users
if ($this->fds) {
return $server->sendToSome($content, $this->fds, $this->noFds, $this->sender, $pageSize, $this->opcode);
}

// Except some users
if ($this->noFds) {
return $server->sendToSome($content, [], $this->noFds, $this->sender, $pageSize, $this->opcode);
}

// No receiver
if ($this->fd < 1) {
CLog::warning('no receiver for the response message');
return 0;
}

// To one user
$ok = $server->sendTo($this->fd, $content, $this->sender, $this->opcode, $this->finish);

Expand Down Expand Up @@ -246,7 +260,7 @@ public function getFd(): int
*
* @return self
*/
public function setFd(int $fd): ResponseInterface
public function setFd(int $fd): self
{
$this->fd = $fd;
return $this;
Expand All @@ -265,7 +279,7 @@ public function getOpcode(): int
*
* @return self
*/
public function setOpcode(int $opcode): ResponseInterface
public function setOpcode(int $opcode): self
{
if ($opcode > 0 && $opcode < 11) {
$this->opcode = $opcode;
Expand All @@ -287,7 +301,7 @@ public function isFinish(): bool
*
* @return self
*/
public function setFinish(bool $finish): ResponseInterface
public function setFinish(bool $finish): self
{
$this->finish = $finish;
return $this;
Expand All @@ -312,6 +326,15 @@ public function setSender(int $sender): ResponseInterface
return $this;
}

/**
* @return Response|self
*/
public function noSender(): self
{
$this->sender = 0;
return $this;
}

/**
* @return string
*/
Expand All @@ -322,6 +345,7 @@ public function getContent(): string

/**
* @param string $content
*
* @return Response|ResponseInterface
*/
public function setContent(string $content): ResponseInterface
Expand Down
4 changes: 0 additions & 4 deletions src/websocket-server/src/Swoole/HandshakeListener.php
Expand Up @@ -2,11 +2,9 @@

namespace Swoft\WebSocket\Server\Swoole;

use ReflectionException;
use Swoft;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Bean\BeanFactory;
use Swoft\Bean\Exception\ContainerException;
use Swoft\Co;
use Swoft\Context\Context;
use Swoft\Http\Message\Request as Psr7Request;
Expand Down Expand Up @@ -42,8 +40,6 @@ class HandshakeListener implements HandshakeInterface
* @param Response $response
*
* @return bool
* @throws ReflectionException
* @throws ContainerException
* @throws Throwable
*/
public function onHandshake(Request $request, Response $response): bool
Expand Down
2 changes: 1 addition & 1 deletion src/websocket-server/src/Swoole/MessageListener.php
Expand Up @@ -58,7 +58,7 @@ public function onMessage(Server $server, Frame $frame): void

try {
// Trigger message before event
Swoft::trigger(WsServerEvent::MESSAGE_BEFORE, $fd, $server, $frame);
Swoft::trigger(WsServerEvent::MESSAGE_RECEIVE, $fd, $server, $frame);

// Parse and dispatch message
$dispatcher->dispatch($server, $request, $response);
Expand Down
8 changes: 4 additions & 4 deletions src/websocket-server/src/WebSocketServer.php
Expand Up @@ -119,7 +119,7 @@ public function sendTo(
}

$fromUser = $sender < 1 ? 'SYSTEM' : $sender;
$this->log("(private)The #{$fromUser} send message to the user #{$receiver}. Data: {$data}");
$this->log("(private)The #{$fromUser} send message to the user #{$receiver}. Opcode: $opcode Data: {$data}");

return $this->swooleServer->push($receiver, $data, $opcode, $finish);
}
Expand Down Expand Up @@ -223,7 +223,7 @@ public function broadcast(
public function sendToAll($data, int $sender = 0, int $pageSize = 50, int $opcode = WEBSOCKET_OPCODE_TEXT): int
{
$fromUser = $sender < 1 ? 'SYSTEM' : $sender;
$this->log("(broadcast)The #{$fromUser} send a message to all users. Data: {$data}");
$this->log("(broadcast)The #{$fromUser} send a message to all users. Opcode: $opcode Data: {$data}");

return $this->pageEach(function (int $fd) use ($data, $opcode) {
$this->swooleServer->push($fd, $data, $opcode);
Expand Down Expand Up @@ -253,7 +253,7 @@ public function sendToSome(

// To receivers
if ($receivers) {
$this->log("(broadcast)The #{$fromUser} gave some specified user sending a message. Data: {$data}");
$this->log("(broadcast)The #{$fromUser} gave some specified user sending a message. Opcode: $opcode Data: {$data}");

foreach ($receivers as $fd) {
if ($this->swooleServer->isEstablished($fd)) {
Expand All @@ -268,7 +268,7 @@ public function sendToSome(
// To special users
$excluded = $excluded ? (array)array_flip($excluded) : [];

$this->log("(broadcast)The #{$fromUser} send the message to everyone except some people. Data: {$data}");
$this->log("(broadcast)The #{$fromUser} send the message to everyone except some people. Opcode: $opcode Data: {$data}");

return $this->pageEach(function (int $fd) use ($excluded, $data, $opcode) {
if (isset($excluded[$fd])) {
Expand Down
17 changes: 13 additions & 4 deletions src/websocket-server/src/WsServerEvent.php
Expand Up @@ -18,10 +18,19 @@ final class WsServerEvent
public const OPEN_AFTER = 'swoft.ws.server.open.after';
public const OPEN_ERROR = 'swoft.ws.server.open.error';

// On before handle message
public const MESSAGE_BEFORE = 'swoft.ws.server.message.before';

// On message send
/**
* @deprecated Please use MESSAGE_RECEIVE instead.
*/
public const MESSAGE_BEFORE = 'swoft.ws.server.message.receive';

/**
* On message receive, before handle message
*/
public const MESSAGE_RECEIVE = 'swoft.ws.server.message.receive';

/**
* On before send message
*/
public const MESSAGE_SEND = 'swoft.ws.server.message.send';

// On after handle message
Expand Down
2 changes: 1 addition & 1 deletion src/websocket-server/test/testing/MockHttpRequest.php
Expand Up @@ -80,7 +80,7 @@ class MockHttpRequest extends Request
*
* @return self
*/
public static function new(array $server, array $headers, array $cookies, array $params): self
public static function new(array $server = [], array $headers = [], array $cookies = [], array $params = []): self
{
$instance = new self;

Expand Down
2 changes: 2 additions & 0 deletions src/websocket-server/test/unit/WsMessageDispatcherTest.php
Expand Up @@ -3,6 +3,7 @@
namespace SwoftTest\WebSocket\Server\Unit;

use PHPUnit\Framework\TestCase;
use Swoft\WebSocket\Server\WsMessageDispatcher;

/**
* Class WsMessageDispatcherTest
Expand All @@ -13,6 +14,7 @@ class WsMessageDispatcherTest extends TestCase
*/
public function testBasic(): void
{
/** @var WsMessageDispatcher $wmd */
$wmd = bean('wsMsgDispatcher');
$this->assertNotEmpty($wmd);
}
Expand Down
19 changes: 19 additions & 0 deletions src/websocket-server/test/unit/WsServerTestCase.php
Expand Up @@ -3,6 +3,12 @@
namespace SwoftTest\WebSocket\Server\Unit;

use PHPUnit\Framework\TestCase;
use Swoft\Http\Message\Request;
use Swoft\Http\Message\Response;
use Swoft\Session\Session;
use Swoft\WebSocket\Server\Connection;
use SwoftTest\WebSocket\Server\Testing\MockHttpRequest;
use SwoftTest\WebSocket\Server\Testing\MockHttpResponse;
use SwoftTest\WebSocket\Server\Testing\MockWsServer;

/**
Expand All @@ -21,4 +27,17 @@ public function setUp(): void
{
$this->wsServer = new MockWsServer();
}

public function addSession(int $fd): void
{
$req = MockHttpRequest::new();
$res = MockHttpResponse::new();

$psrReq = Request::new($req);
$psrRes = Response::new($res);

$conn = Connection::new($fd, $psrReq, $psrRes);

Session::set((string)$fd, $conn);
}
}

0 comments on commit 3a8890c

Please sign in to comment.