Skip to content

Commit

Permalink
* Added special exception types
Browse files Browse the repository at this point in the history
 * Added optional parameter to allow disabling creating unix-socket connector
 * Added optional parameter to allow disabling exchange data between workers
  • Loading branch information
Alex committed Mar 19, 2018
1 parent 2b52cee commit c732b1a
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 24 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -4,6 +4,7 @@ sudo: false

php:
- 7.1
- 7.2

before_script:
- curl --version
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Expand Up @@ -27,7 +27,8 @@
},
"autoload": {
"psr-4": {
"Ollyxar\\WebSockets\\": "src/"
"Ollyxar\\WebSockets\\": "src/",
"Ollyxar\\WebSockets\\Exceptions\\": "src/Exceptions/"
}
},
"autoload-dev": {
Expand Down
9 changes: 9 additions & 0 deletions src/Exceptions/ForkException.php
@@ -0,0 +1,9 @@
<?php namespace Ollyxar\WebSockets\Exceptions;

/**
* Class ForkException
* @package Ollyxar\WebSockets\Exceptions
*/
class ForkException extends \Exception
{
}
9 changes: 9 additions & 0 deletions src/Exceptions/SocketException.php
@@ -0,0 +1,9 @@
<?php namespace Ollyxar\WebSockets\Exceptions;

/**
* Class SocketException
* @package Ollyxar\WebSockets\Exceptions
*/
class SocketException extends \Exception
{
}
6 changes: 3 additions & 3 deletions src/Handler.php
@@ -1,6 +1,5 @@
<?php namespace Ollyxar\WebSockets;

use Exception;
use Generator;

/**
Expand Down Expand Up @@ -28,7 +27,8 @@ public function __construct($server, $master)
}

/**
* Sending message to all connected users
* Sending message to all connected users.
* If $global is true then worker will send data to master to retransmit data to all workers
*
* @param string $msg
* @param bool $global
Expand Down Expand Up @@ -84,7 +84,7 @@ private function handshake($socket): Generator
try {
yield Dispatcher::async($this->write($socket, $response));
yield Dispatcher::async($this->afterHandshake($headers, $socket));
} catch (Exception $e) {
} catch (\Throwable $e) {
return yield;
}
}
Expand Down
28 changes: 22 additions & 6 deletions src/Master.php
Expand Up @@ -18,16 +18,25 @@ final class Master
*/
private $connector;

/**
* If true then Master will retransmit data from one worker to others
*
* @var bool
*/
private $exchangeWorkersData = true;

/**
* Master constructor.
*
* @param $workers
* @param $connector
* @param null $connector
* @param bool $exchangeWorkersData
*/
public function __construct($workers, $connector)
public function __construct($workers, $connector = null, $exchangeWorkersData = true)
{
$this->workers = $workers;
$this->connector = $connector;
$this->exchangeWorkersData = $exchangeWorkersData;
}

/**
Expand Down Expand Up @@ -111,9 +120,16 @@ protected function listenConnector(): Generator
*/
public function dispatch(): void
{
(new Dispatcher())
->add($this->listenConnector())
->add($this->listenWorkers())
->dispatch();
$dispatcher = new Dispatcher();

if ($this->connector) {
$dispatcher->add($this->listenConnector());
}

if ($this->exchangeWorkersData) {
$dispatcher->add($this->listenWorkers());
}

$dispatcher->dispatch();
}
}
42 changes: 28 additions & 14 deletions src/Server.php
@@ -1,6 +1,9 @@
<?php namespace Ollyxar\WebSockets;

use Exception;
use Ollyxar\WebSockets\Exceptions\{
ForkException,
SocketException
};

/**
* Class Server
Expand All @@ -13,6 +16,8 @@ class Server
protected $host = '0.0.0.0';
protected $port = 2083;
protected $useSSL = false;
protected $useConnector = true;
protected $exchangeWorkersData = true;
protected $cert;
protected $passPhrase;
protected $workerCount = 4;
Expand All @@ -26,27 +31,27 @@ class Server
* @param int $port
* @param int $workerCount
* @param bool $useSSL
* @param bool $useConnector
* @param bool $exchangeWorkersData
*/
public function __construct(string $host, int $port, int $workerCount = 4, $useSSL = false)
public function __construct(string $host, int $port, int $workerCount = 4, $useSSL = false, $useConnector = true, $exchangeWorkersData = true)
{
$this->host = $host;
$this->port = $port;
$this->useSSL = $useSSL;
$this->useConnector = $useConnector;
$this->exchangeWorkersData = $exchangeWorkersData;
$this->workerCount = $workerCount;
}

/**
* Make server sockets
*
* @throws Exception
* @throws SocketException
* @return void
*/
private function makeSocket(): void
{
if (file_exists(static::$connector)) {
unlink(static::$connector);
}

if ($this->useSSL) {
$context = stream_context_create([
'ssl' => [
Expand All @@ -67,19 +72,24 @@ private function makeSocket(): void

$this->socket = stream_socket_server("$protocol://{$this->host}:{$this->port}", $errorNumber, $errorString, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);

$this->unixConnector = stream_socket_server('unix://' . static::$connector, $errorNumber, $errorString, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
if ($this->useConnector) {
if (file_exists(static::$connector)) {
unlink(static::$connector);
}

chmod(static::$connector, 0777);
$this->unixConnector = stream_socket_server('unix://' . static::$connector, $errorNumber, $errorString, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
chmod(static::$connector, 0777);
}

if (!$this->socket) {
throw new Exception($errorString, $errorNumber);
throw new SocketException($errorString, $errorNumber);
}
}

/**
* Spawning process to avoid system limits and increase performance
*
* @throws Exception
* @throws ForkException
* @return array
*/
private function spawn(): array
Expand All @@ -93,7 +103,7 @@ private function spawn(): array
$pid = pcntl_fork();

if ($pid == -1) {
throw new Exception('Cannot fork process');
throw new ForkException('Cannot fork process');
} elseif ($pid) {
fclose($pair[0]);
$workers[$pid] = $pair[1];
Expand Down Expand Up @@ -150,7 +160,8 @@ public function setPassPhrase(string $passPhrase = 'abracadabra'): self
* Launching server
*
* @return void
* @throws Exception
* @throws ForkException
* @throws SocketException
*/
public function run(): void
{
Expand All @@ -162,7 +173,10 @@ public function run(): void
fclose($this->socket);
(new Master($workers, $this->unixConnector))->dispatch();
} else {
fclose($this->unixConnector);
if ($this->useConnector) {
fclose($this->unixConnector);
}

(new $this->handler($this->socket, $master))->handle();
}
}
Expand Down

0 comments on commit c732b1a

Please sign in to comment.