diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..98cae90 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,6 @@ +.gitattributes export-ignore +.gitignore export-ignore +docs/ export-ignore +Makefile export-ignore +phpunit.xml.dist export-ignore +tests/ export-ignore diff --git a/.github/workflows/acceptance.yml b/.github/workflows/acceptance.yml index 4b4dbbb..c604970 100644 --- a/.github/workflows/acceptance.yml +++ b/.github/workflows/acceptance.yml @@ -63,16 +63,31 @@ jobs: - name: Test run: make test + test-8-3: + runs-on: ubuntu-latest + name: Test PHP 8.3 + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up PHP 8.3 + uses: shivammathur/setup-php@v2 + with: + php-version: '8.3' + - name: Composer + run: make deps-install + - name: Test + run: make test + cs-check: runs-on: ubuntu-latest name: Code standard steps: - name: Checkout uses: actions/checkout@v3 - - name: Set up PHP 8.1 + - name: Set up PHP 8.2 uses: shivammathur/setup-php@v2 with: - php-version: '8.1' + php-version: '8.2' - name: Composer run: make deps-install - name: Code standard @@ -84,10 +99,10 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 - - name: Set up PHP 8.1 + - name: Set up PHP 8.2 uses: shivammathur/setup-php@v2 with: - php-version: '8.1' + php-version: '8.2' extensions: xdebug - name: Composer run: make deps-install diff --git a/README.md b/README.md index 4577b53..504b4f4 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,10 @@ class Stream implements StreamInterface { public function read(int $length): string; public function getContents(): string; public function getMetadata(?string $key = null): mixed; + + // Additional methods + + public function getResource(): resource; } ``` @@ -56,9 +60,34 @@ class SocketStream extends Stream { // Methods + public function isConnected(): bool; // If stream is connected to remote public function getRemoteName(): ?string; // Returns remote name + public function getLocalName(): ?string; // Returns local name + public function getResourceType(): string; // Get resource type public function isBlocking(): ?bool; // If stream is blocking or not public function setBlocking(bool $enable): bool; // Change blocking mode + public function setTimeout(int $seconds, int $microseconds = 0): bool; // Set timeout + public function readLine(int $length): ?string // Read a line from stream, up to $length bytes +} +``` + +## SockeClient class + +The `Phrity\Net\SockeClient` class enables a client for remote socket. + +```php +class SocketClient { + + // Constructor + + public function __construct(UriInterface $uri); + + // Methods + + public function setPersistent(bool $persistent): self; // If client should use persisten connection + public function setTimeout(?int $timeout): self; // Set timeout + public function setContext(?array $options = null, ?array $params = null): self; // Set stream context + public function connect(): ?SocketStream; // Connect to remote } ``` @@ -76,7 +105,7 @@ class SocketServer extends Stream { // Methods public function accept(?int $timeout = null): ?SocketStream; // Accept connection on socket server - public function getTransports(): array; // Ge available transports + public function getTransports(): array; // Get available transports public function isBlocking(): ?bool; // If stream is blocking or not public function setBlocking(bool $enable): bool; // Change blocking mode } @@ -136,14 +165,29 @@ class StreamFactory implements StreamFactoryInterface { // Additional methods public function createSocketStreamFromResource($resource): SocketStream; // Create a socket stream + public function createSocketClient(UriInterface $uri): SocketClient; / Create socket client public function createSocketServer(UriInterface $uri, int $flags = STREAM_SERVER_BIND | STREAM_SERVER_LISTEN): SocketServer; // Create a socket server public function createStreamCollection(): StreamCollection; // Create a stream collection } ``` +## StreamException class + +The `Phrity\Net\StreamException` is thrown when astream related error occurs. + +```php +class StreamException extends RuntimeException { + + // Constructor + + public function __construct(int $code, array $data = [], ?Throwable $previous = null) +} +``` + ## Versions | Version | PHP | | | --- | --- | --- | -| `1.0` | `^7.4\|^8.0` | Initial version | +| `1.2` | `^7.4\|^8.0` | Socket client | | `1.1` | `^7.4\|^8.0` | Stream collection | +| `1.0` | `^7.4\|^8.0` | Initial version | diff --git a/composer.json b/composer.json index 6296493..5865902 100644 --- a/composer.json +++ b/composer.json @@ -1,37 +1,37 @@ { - "name": "phrity/net-stream", - "type": "library", - "description": "Socket stream classes implementing PSR-7 Stream and PSR-17 StreamFactory", - "homepage": "https://phrity.sirn.se/net-stream", - "keywords": ["socket", "stream", "stream factory", "client", "server", "PSR-7", "PSR-17"], - "license": "MIT", - "authors": [ - { - "name": "Sören Jensen", - "email": "sirn@sirn.se", - "homepage": "https://phrity.sirn.se" + "name": "phrity/net-stream", + "type": "library", + "description": "Socket stream classes implementing PSR-7 Stream and PSR-17 StreamFactory", + "homepage": "https://phrity.sirn.se/net-stream", + "keywords": ["socket", "stream", "stream factory", "client", "server", "PSR-7", "PSR-17"], + "license": "MIT", + "authors": [ + { + "name": "Sören Jensen", + "email": "sirn@sirn.se", + "homepage": "https://phrity.sirn.se" + } + ], + "autoload": { + "psr-4": { + "Phrity\\Net\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "Phrity\\Net\\Test\\": "tests/" + } + }, + "require": { + "php": "^7.4 | ^8.0", + "phrity/util-errorhandler": "^1.0", + "psr/http-factory": "^1.0", + "psr/http-message": "^1.0 | ^2.0" + }, + "require-dev": { + "phpunit/phpunit": "^9.0 | ^10.0", + "php-coveralls/php-coveralls": "^2.0", + "phrity/net-uri": "^1.1", + "squizlabs/php_codesniffer": "^3.0" } - ], - "autoload": { - "psr-4": { - "": "src/" - } - }, - "autoload-dev": { - "psr-4": { - "Phrity\\Net\\": "tests/classes/" - } - }, - "require": { - "php": "^7.4|^8.0", - "phrity/util-errorhandler": "^1.0", - "psr/http-factory": "^1.0", - "psr/http-message": "^1.0" - }, - "require-dev": { - "phpunit/phpunit": "^9.0", - "php-coveralls/php-coveralls": "^2.0", - "phrity/net-uri": "^1.1", - "squizlabs/php_codesniffer": "^3.0" - } -} \ No newline at end of file +} diff --git a/src/Phrity/Net/SocketServer.php b/src/Phrity/Net/SocketServer.php deleted file mode 100644 index 4c64a73..0000000 --- a/src/Phrity/Net/SocketServer.php +++ /dev/null @@ -1,109 +0,0 @@ - Net > Stream - */ - -namespace Phrity\Net; - -use ErrorException; -use Phrity\Util\ErrorHandler; -use Psr\Http\Message\{ - StreamInterface, - UriInterface -}; -use RuntimeException; - -/** - * Net\SocketServer class. - */ -class SocketServer extends Stream -{ - private static $internet_schemes = ['tcp', 'udp', 'tls', 'ssl']; - private static $unix_schemes = ['unix', 'udg']; - - protected $handler; - protected $stream; - - /** - * Create new socker server instance - * \Psr\Http\Message\UriInterface $uri The URI to open socket on. - * int $flags Flags to set on socket. - * @throws \RuntimeException if unable to create socket. - */ - public function __construct(UriInterface $uri, int $flags = STREAM_SERVER_BIND | STREAM_SERVER_LISTEN) - { - $this->handler = new ErrorHandler(); - if (!in_array($uri->getScheme(), $this->getTransports())) { - throw new RuntimeException("Scheme '{$uri->getScheme()}' is not supported."); - } - if (in_array(substr($uri->getScheme(), 0, 3), self::$internet_schemes)) { - $address = "{$uri->getScheme()}://{$uri->getAuthority()}"; - } elseif (in_array($uri->getScheme(), self::$unix_schemes)) { - $address = "{$uri->getScheme()}://{$uri->getPath()}"; - } else { - throw new RuntimeException("Could not handle scheme '{$uri->getScheme()}'."); - } - $this->stream = $this->handler->with(function () use ($address, $flags) { - $error_code = $error_message = ''; - return stream_socket_server($address, $error_code, $error_message, $flags); - }, new RuntimeException("Could not create socket for '{$uri}'.")); - } - - /** - * Accept a connection on a socket. - * @param int|null $timeout Override the default socket accept timeout. - * @return \Psr\Http\Message\StreamInterface|null The stream for opened conenction. - * @throws \RuntimeException if socket is closed - */ - public function accept(?int $timeout = null): ?SocketStream - { - if (!isset($this->stream)) { - throw new RuntimeException("Server is closed."); - } - $stream = $this->handler->with(function () use ($timeout) { - $peer_name = ''; - return stream_socket_accept($this->stream, $timeout, $peer_name); - }, function (ErrorException $e) { - // If non-blocking mode, don't throw error on time out - if ($this->getMetadata('blocked') === false && substr_count($e->getMessage(), 'timed out') > 0) { - return null; - } - throw new RuntimeException("Could not accept on socket."); - }); - return $stream ? new SocketStream($stream) : null; - } - - /** - * Retrieve list of registered socket transports. - * @return array List of registered transports. - */ - public function getTransports(): array - { - return stream_get_transports(); - } - - /** - * If server is in blocking mode. - * @return bool|null - */ - public function isBlocking(): ?bool - { - return $this->getMetadata('blocked'); - } - - /** - * Toggle blocking/non-blocking mode. - * @param bool $enable Blocking mode to set. - * @return bool If operation was succesful. - * @throws \RuntimeException if socket is closed. - */ - public function setBlocking(bool $enable): bool - { - if (!isset($this->stream)) { - throw new RuntimeException("Server is closed."); - } - return stream_set_blocking($this->stream, $enable); - } -} diff --git a/src/Phrity/Net/SocketStream.php b/src/Phrity/Net/SocketStream.php deleted file mode 100644 index dbdaf76..0000000 --- a/src/Phrity/Net/SocketStream.php +++ /dev/null @@ -1,48 +0,0 @@ - Net > Stream - */ - -namespace Phrity\Net; - -use RuntimeException; - -/** - * Net\SocketStream class. - */ -class SocketStream extends Stream -{ - /** - * Get name of remote socket, or null if not connected. - * @return string|null - */ - public function getRemoteName(): ?string - { - return stream_socket_get_name($this->stream, true); - } - - /** - * If stream is in blocking mode. - * @return bool|null - */ - public function isBlocking(): ?bool - { - return $this->getMetadata('blocked'); - } - - /** - * Toggle blocking/non-blocking mode. - * @param bool $enable Blocking mode to set. - * @return bool If operation was succesful. - * @throws \RuntimeException if stream is closed. - */ - public function setBlocking(bool $enable): bool - { - if (!isset($this->stream)) { - throw new RuntimeException("Stream is detached."); - } - return stream_set_blocking($this->stream, $enable); - } -} diff --git a/src/SocketClient.php b/src/SocketClient.php new file mode 100644 index 0000000..209a4e3 --- /dev/null +++ b/src/SocketClient.php @@ -0,0 +1,89 @@ +uri = $uri; + $this->handler = new ErrorHandler(); + } + + + // ---------- Configuration --------------------------------------------------------------------------------------- + + /** + * Set stream context. + * @param array|null $options + * @param array|null $params + * @return \Phrity\Net\SocketClient + */ + public function setContext(?array $options = null, ?array $params = null): self + { + $this->context = stream_context_create($options, $params); + return $this; + } + + /** + * Set connection persistency. + * @param bool $persistent + * @return \Phrity\Net\SocketClient + */ + public function setPersistent(bool $persistent): self + { + $this->persistent = $persistent; + return $this; + } + + /** + * Set timeout in seconds. + * @param int|null $timeout + * @return \Phrity\Net\SocketClient + */ + public function setTimeout(?int $timeout): self + { + $this->timeout = $timeout; + return $this; + } + + + // ---------- Operations ------------------------------------------------------------------------------------------ + + /** + * Create a connection on remote socket. + * @return \Phrity\Net\SocketStream The stream for opened conenction. + * @throws StreamException if connection could not be created + */ + public function connect(): SocketStream + { + $stream = $this->handler->with(function () { + $error_code = $error_message = ''; + return stream_socket_client( + $this->uri->__toString(), + $error_code, + $error_message, + $this->timeout, + $this->persistent ? STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT : STREAM_CLIENT_CONNECT, + $this->context + ); + }, new StreamException(StreamException::CLIENT_CONNECT_ERR, ['uri' => $this->uri])); + return new SocketStream($stream); + } +} diff --git a/src/SocketServer.php b/src/SocketServer.php new file mode 100644 index 0000000..ce9be8d --- /dev/null +++ b/src/SocketServer.php @@ -0,0 +1,130 @@ +handler = new ErrorHandler(); + if (!in_array($uri->getScheme(), $this->getTransports())) { + throw new StreamException(StreamException::SCHEME_TRANSPORT, ['scheme' => $uri->getScheme()]); + } + if (in_array(substr($uri->getScheme(), 0, 3), self::$internet_schemes)) { + $this->address = "{$uri->getScheme()}://{$uri->getAuthority()}"; + } elseif (in_array($uri->getScheme(), self::$unix_schemes)) { + $this->address = "{$uri->getScheme()}://{$uri->getPath()}"; + } else { + throw new StreamException(StreamException::SCHEME_HANDLER, ['scheme' => $uri->getScheme()]); + } + $this->stream = $this->handler->with(function () use ($flags) { + $error_code = $error_message = ''; + return stream_socket_server($this->address, $error_code, $error_message, $flags); + }, new StreamException(StreamException::SERVER_SOCKET_ERR, ['uri' => $uri->__toString()])); + $this->evalStream(); + } + + + // ---------- Configuration --------------------------------------------------------------------------------------- + + /** + * Retrieve list of registered socket transports. + * @return array List of registered transports. + */ + public function getTransports(): array + { + return stream_get_transports(); + } + + /** + * If server is in blocking mode. + * @return bool|null + */ + public function isBlocking(): ?bool + { + return $this->getMetadata('blocked'); + } + + /** + * Toggle blocking/non-blocking mode. + * @param bool $enable Blocking mode to set. + * @return bool If operation was succesful. + * @throws StreamException if socket is closed. + */ + public function setBlocking(bool $enable): bool + { + if (!isset($this->stream)) { + throw new StreamException(StreamException::SERVER_CLOSED); + } + return stream_set_blocking($this->stream, $enable); + } + + /** + * Get stream metadata as an associative array or retrieve a specific key. + * @param string $key Specific metadata to retrieve. + * @return array|mixed|null Returns an associative array if no key is + * provided. Returns a specific key value if a key is provided and the + * value is found, or null if the key is not found. + */ + public function getMetadata($key = null) + { + if (!isset($this->stream)) { + return null; + } + // Add URI default for version compability + $meta = array_merge([ + 'uri' => $this->address, + ], stream_get_meta_data($this->stream)); + if (isset($key)) { + return array_key_exists($key, $meta) ? $meta[$key] : null; + } + return $meta; + } + + + // ---------- Operations ------------------------------------------------------------------------------------------ + + /** + * Accept a connection on a socket. + * @param int|null $timeout Override the default socket accept timeout. + * @return Phrity\Net\SocketStream|null The stream for opened conenction. + * @throws StreamException if socket is closed + */ + public function accept(?int $timeout = null): ?SocketStream + { + if (!isset($this->stream)) { + throw new StreamException(StreamException::SERVER_CLOSED); + } + $stream = $this->handler->with(function () use ($timeout) { + $peer_name = ''; + return stream_socket_accept($this->stream, $timeout, $peer_name); + }, function (ErrorException $e) { + // If non-blocking mode, don't throw error on time out + if ($this->getMetadata('blocked') === false && substr_count($e->getMessage(), 'timed out') > 0) { + return null; + } + throw new StreamException(StreamException::SERVER_ACCEPT_ERR); + }); + return $stream ? new SocketStream($stream) : null; + } +} diff --git a/src/SocketStream.php b/src/SocketStream.php new file mode 100644 index 0000000..5c75bbe --- /dev/null +++ b/src/SocketStream.php @@ -0,0 +1,108 @@ +stream && ($this->readable || $this->writable); + } + + /** + * Get name of remote socket, or null if not connected. + * @return string|null + */ + public function getRemoteName(): ?string + { + return stream_socket_get_name($this->stream, true); + } + + /** + * Get name of local socket, or null if not connected. + * @return string|null + */ + public function getLocalName(): ?string + { + return stream_socket_get_name($this->stream, false); + } + + /** + * Get type of stream resoucre. + * @return string + */ + public function getResourceType(): string + { + return $this->stream ? get_resource_type($this->stream) : ''; + } + + /** + * If stream is in blocking mode. + * @return bool|null + */ + public function isBlocking(): ?bool + { + return $this->getMetadata('blocked'); + } + + /** + * Toggle blocking/non-blocking mode. + * @param bool $enable Blocking mode to set. + * @return bool If operation was succesful. + * @throws \StreamException if stream is closed. + */ + public function setBlocking(bool $enable): bool + { + if (!isset($this->stream)) { + throw new StreamException(StreamException::STREAM_DETACHED); + } + return stream_set_blocking($this->stream, $enable); + } + + /** + * Set timeout period on a stream. + * @param int $seconds Seconds to be set. + * @param int $microseconds Microseconds to be set. + * @return bool If operation was succesful. + * @throws \StreamException if stream is closed. + */ + public function setTimeout(int $seconds, int $microseconds = 0): bool + { + if (!isset($this->stream)) { + throw new StreamException(StreamException::STREAM_DETACHED); + } + return stream_set_timeout($this->stream, $seconds, $microseconds); + } + + + // ---------- Operations ------------------------------------------------------------------------------------------ + + /** + * Read line from the stream. + * @param int $length Read up to $length bytes from the object and return them. + * @return string|null Returns the data read from the stream, or null of eof. + * @throws \StreamException if an error occurs. + */ + public function readLine(int $length): ?string + { + if (!isset($this->stream)) { + throw new StreamException(StreamException::STREAM_DETACHED); + } + if (!$this->readable) { + throw new StreamException(StreamException::NOT_READABLE); + } + return $this->handler->with(function () use ($length) { + $result = fgets($this->stream, $length); + return $result === false ? null : $result; + }, new StreamException(StreamException::FAIL_GETS)); + } +} diff --git a/src/Phrity/Net/Stream.php b/src/Stream.php similarity index 78% rename from src/Phrity/Net/Stream.php rename to src/Stream.php index 8c58d41..6b9bc31 100644 --- a/src/Phrity/Net/Stream.php +++ b/src/Stream.php @@ -1,22 +1,16 @@ Net > Stream - * @see https://www.php-fig.org/psr/psr-7/#34-psrhttpmessagestreaminterface - */ - namespace Phrity\Net; use InvalidArgumentException; use Phrity\Util\ErrorHandler; use Psr\Http\Message\StreamInterface; -use RuntimeException; use Throwable; /** - * Net\Stream class. - */ + * Phrity\Net\Stream class. + * @see https://www.php-fig.org/psr/psr-7/#34-psrhttpmessagestreaminterface +*/ class Stream implements StreamInterface { private static $readmodes = ['r', 'r+', 'w+', 'a+', 'x+', 'c+']; @@ -60,7 +54,7 @@ public function close(): void if (isset($this->stream)) { fclose($this->stream); } - unset($this->stream); + $this->stream = null; $this->evalStream(); } @@ -75,7 +69,7 @@ public function detach() return null; } $stream = $this->stream; - unset($this->stream); + $this->stream = null; $this->evalStream(); return $stream; } @@ -102,16 +96,16 @@ public function getMetadata($key = null) /** * Returns the current position of the file read/write pointer * @return int Position of the file pointer - * @throws \RuntimeException on error. + * @throws \StreamException on error. */ public function tell(): int { if (!isset($this->stream)) { - throw new RuntimeException('Stream is detached.'); + throw new StreamException(StreamException::STREAM_DETACHED); } return $this->handler->with(function () { return ftell($this->stream); - }, new RuntimeException('Failed tell() on stream.')); + }, new StreamException(StreamException::FAIL_TELL)); } /** @@ -127,38 +121,38 @@ public function eof(): bool * Read data from the stream. * @param int $length Read up to $length bytes from the object and return them. * @return string Returns the data read from the stream, or an empty string. - * @throws \RuntimeException if an error occurs. + * @throws \StreamException if an error occurs. */ public function read($length): string { if (!isset($this->stream)) { - throw new RuntimeException('Stream is detached.'); + throw new StreamException(StreamException::STREAM_DETACHED); } if (!$this->readable) { - throw new RuntimeException('Stream is not readable.'); + throw new StreamException(StreamException::NOT_READABLE); } return $this->handler->with(function () use ($length) { - return fread($this->stream, $length); - }, new RuntimeException('Failed read() on stream.')); + return (string)fread($this->stream, $length); + }, new StreamException(StreamException::FAIL_READ)); } /** * Write data to the stream. * @param string $string The string that is to be written. * @return int Returns the number of bytes written to the stream. - * @throws \RuntimeException on failure. + * @throws \StreamException on failure. */ public function write($string): int { if (!isset($this->stream)) { - throw new RuntimeException('Stream is detached.'); + throw new StreamException(StreamException::STREAM_DETACHED); } if (!$this->writable) { - throw new RuntimeException('Stream is not writable.'); + throw new StreamException(StreamException::NOT_WRITABLE); } return $this->handler->with(function () use ($string) { return fwrite($this->stream, $string); - }, new RuntimeException('Failed write() on stream.')); + }, new StreamException(StreamException::FAIL_WRITE)); } /** @@ -187,19 +181,19 @@ public function isSeekable(): bool * Seek to a position in the stream. * @param int $offset Stream offset * @param int $whence Specifies how the cursor position will be calculated based on the seek offset. - * @throws \RuntimeException on failure. + * @throws \StreamException on failure. */ public function seek($offset, $whence = SEEK_SET): void { if (!isset($this->stream)) { - throw new RuntimeException('Stream is detached.'); + throw new StreamException(StreamException::STREAM_DETACHED); } if (!$this->seekable) { - throw new RuntimeException('Stream is not seekable.'); + throw new StreamException(StreamException::NOT_SEEKABLE); } $result = fseek($this->stream, $offset, $whence); if ($result !== 0) { - throw new RuntimeException('Failed to seek.'); + throw new StreamException(StreamException::FAIL_SEEK); } } @@ -207,7 +201,6 @@ public function seek($offset, $whence = SEEK_SET): void * Seek to the beginning of the stream. * If the stream is not seekable, this method will raise an exception; * otherwise, it will perform a seek(0). - * @throws \RuntimeException on failure. */ public function rewind(): void { @@ -235,20 +228,20 @@ public function isReadable(): bool /** * Returns the remaining contents in a string * @return string - * @throws \RuntimeException if unable to read. - * @throws \RuntimeException if error occurs while reading. + * @throws \StreamException if unable to read. + * @throws \StreamException if error occurs while reading. */ public function getContents(): string { if (!isset($this->stream)) { - throw new RuntimeException('Stream is detached.'); + throw new StreamException(StreamException::STREAM_DETACHED); } if (!$this->readable) { - throw new RuntimeException('Stream is not readable.'); + throw new StreamException(StreamException::NOT_READABLE); } return $this->handler->with(function () { return stream_get_contents($this->stream); - }, new RuntimeException('Failed getContents() on stream.')); + }, new StreamException(StreamException::FAIL_CONTENTS)); } /** @@ -268,6 +261,9 @@ public function __toString(): string } } + + // ---------- Extended methods ------------------------------------------------------------------------------------ + /** * Return underlying resource. * @return resource|null. @@ -285,14 +281,13 @@ public function getResource() */ protected function evalStream(): void { - $meta = $this->getMetadata(); - if (!$meta) { - $this->readable = $this->writable = $this->seekable = false; + if ($this->stream && $meta = $this->getMetadata()) { + $mode = substr($meta['mode'], 0, 2); + $this->readable = in_array($mode, self::$readmodes); + $this->writable = in_array($mode, self::$writemodes); + $this->seekable = $meta['seekable']; return; } - $mode = substr($meta['mode'], 0, 2); - $this->readable = in_array($mode, self::$readmodes); - $this->writable = in_array($mode, self::$writemodes); - $this->seekable = $meta['seekable']; + $this->readable = $this->writable = $this->seekable = false; } } diff --git a/src/Phrity/Net/StreamCollection.php b/src/StreamCollection.php similarity index 83% rename from src/Phrity/Net/StreamCollection.php rename to src/StreamCollection.php index 66d9de6..234fe9c 100644 --- a/src/Phrity/Net/StreamCollection.php +++ b/src/StreamCollection.php @@ -1,20 +1,14 @@ Net > Stream - */ - namespace Phrity\Net; use Countable; use Iterator; use Phrity\Util\ErrorHandler; -use RuntimeException; use TypeError; /** - * Net\StreamCollection class. + * Phrity\Net\StreamCollection class. */ class StreamCollection implements Countable, Iterator { @@ -30,18 +24,19 @@ public function __construct() } - // Collectors and selectors + // ---------- Collectors and selectors ---------------------------------------------------------------------------- /** * Attach stream to collection. * @param Stream $attach Stream to attach. * @param string|null $key Definable name of stream. * @return string Name of stream. + * @throws StreamException If already attached. */ public function attach(Stream $attach, ?string $key = null): string { if ($key && array_key_exists($key, $this->streams)) { - throw new RuntimeException("Stream with name '{$key}' already attached."); + throw new StreamException(StreamException::COLLECT_KEY_CONFLICT, ['key' => $key]); } $key = $key ?: $this->createKey(); $this->streams[$key] = $attach; @@ -108,6 +103,7 @@ public function getWritable(): self * Wait for redable content in stream collection. * @param int $seconds Timeout in seconds. * @return self New collection instance. + * @throws StreamException If fails to select. */ public function waitRead(int $seconds = 60): self { @@ -117,12 +113,15 @@ public function waitRead(int $seconds = 60): self $read[$key] = $stream->getResource(); } } + if (empty($read)) { + return new self(); // Nothing to select + } $changed = $this->handler->with(function () use ($read, $seconds) { $write = $oob = []; stream_select($read, $write, $oob, $seconds); return $read; - }, new RuntimeException('Failed to select streams for reading.')); + }, new StreamException(StreamException::COLLECT_SELECT_ERR)); $ready = new self(); foreach ($changed as $key => $resource) { @@ -132,7 +131,7 @@ public function waitRead(int $seconds = 60): self } - // Countable interface implementation + // ---------- Countable interface implementation ------------------------------------------------------------------ /** * Count contained streams. @@ -144,7 +143,7 @@ public function count(): int } - // Iterator interface implementation + // ---------- Iterator interface implementation ------------------------------------------------------------------- /** * Return the current stream. @@ -190,7 +189,7 @@ public function valid(): bool } - // Protected helper methods + // ---------- Protected helper methods ---------------------------------------------------------------------------- /** * Create unique key. diff --git a/src/StreamException.php b/src/StreamException.php new file mode 100644 index 0000000..9a1ddc9 --- /dev/null +++ b/src/StreamException.php @@ -0,0 +1,73 @@ + 'Stream is detached.', + self::NOT_READABLE => 'Stream is not readable.', + self::NOT_WRITABLE => 'Stream is not writable.', + self::NOT_SEEKABLE => 'Stream is not seekable.', + self::FAIL_READ => 'Failed read() on stream.', + self::FAIL_WRITE => 'Failed write() on stream.', + self::FAIL_SEEK => 'Failed seek() on stream.', + self::FAIL_TELL => 'Failed tell() on stream.', + self::FAIL_CONTENTS => 'Failed getContents() on stream.', + self::FAIL_GETS => 'Failed gets() on stream.', + self::CLIENT_CONNECT_ERR => 'Client could not connect to "{uri}".', + self::SCHEME_TRANSPORT => 'Scheme "{scheme}" is not supported.', + self::SCHEME_HANDLER => 'Could not handle scheme "{scheme}".', + self::SERVER_SOCKET_ERR => 'Could not create socket for "{uri}".', + self::SERVER_CLOSED => 'Server is closed.', + self::SERVER_ACCEPT_ERR => 'Could not accept on socket.', + self::COLLECT_KEY_CONFLICT => 'Stream with name "{key}" already attached.', + self::COLLECT_SELECT_ERR => 'Failed to select streams for reading.', + ]; + + /** + * Create exception. + * @param int $code Error code + * @param array $data Additional data + * @param Throwable|null $previous Previous exception + */ + public function __construct(int $code, array $data = [], ?Throwable $previous = null) + { + $message = self::$messages[$code]; + foreach ($data as $key => $content) { + $message = str_replace('{' . $key . '}', $content, $message); + } + parent::__construct($message, $code, $previous); + } +} diff --git a/src/Phrity/Net/StreamFactory.php b/src/StreamFactory.php similarity index 91% rename from src/Phrity/Net/StreamFactory.php rename to src/StreamFactory.php index a45692a..f8124de 100644 --- a/src/Phrity/Net/StreamFactory.php +++ b/src/StreamFactory.php @@ -1,11 +1,5 @@ Net > Stream - * @see https://www.php-fig.org/psr/psr-17/#24-streamfactoryinterface - */ - namespace Phrity\Net; use InvalidArgumentException; @@ -17,7 +11,8 @@ use RuntimeException; /** - * Net\StreamFactory class. + * Phrity\Net\StreamFactory class. + * @see https://www.php-fig.org/psr/psr-17/#24-streamfactoryinterface */ class StreamFactory implements StreamFactoryInterface { @@ -85,14 +80,13 @@ public function createStreamFromResource($resource): Stream // ---------- Extensions ------------------------------------------------------------------------------------------ /** - * Create a new ocket stream from an existing resource. - * The stream MUST be readable and may be writable. - * @param resource $resource The PHP resource to use as the basis for the stream. - * @return \Phrity\Net\SocketStream A socket stream instance. + * Create a new socket client. + * @param \Psr\Http\Message\UriInterface $uri The URI to connect to. + * @return \Phrity\Net\SocketClient A socket client instance. */ - public function createSocketStreamFromResource($resource): SocketStream + public function createSocketClient(UriInterface $uri): SocketClient { - return new SocketStream($resource); + return new SocketClient($uri); } /** @@ -108,6 +102,17 @@ public function createSocketServer( return new SocketServer($uri, $flags); } + /** + * Create a new ocket stream from an existing resource. + * The stream MUST be readable and may be writable. + * @param resource $resource The PHP resource to use as the basis for the stream. + * @return \Phrity\Net\SocketStream A socket stream instance. + */ + public function createSocketStreamFromResource($resource): SocketStream + { + return new SocketStream($resource); + } + /** * Create a new stream collection. * @return \Phrity\Net\StreamCollection A stream collection. diff --git a/tests/SocketClientTest.php b/tests/SocketClientTest.php new file mode 100644 index 0000000..2456890 --- /dev/null +++ b/tests/SocketClientTest.php @@ -0,0 +1,41 @@ +assertInstanceOf(SocketClient::class, $client); + $this->assertSame($client, $client->setPersistent(true)); + $this->assertSame($client, $client->setTimeout(1)); + $this->assertSame($client, $client->setContext([])); + + $stream = $client->connect(); + $this->assertInstanceOf(SocketStream::class, $stream); + $stream->close(); + } + + public function testClientConnectFailure(): void + { + $uri = new Uri('tcp://localhost:80'); + $client = new SocketClient($uri); + + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::CLIENT_CONNECT_ERR); + $this->expectExceptionMessage('Client could not connect to "tcp://localhost:80".'); + $client->connect(); + } +} diff --git a/tests/classes/SocketServerMock.php b/tests/SocketServerMock.php similarity index 84% rename from tests/classes/SocketServerMock.php rename to tests/SocketServerMock.php index c2ad753..d78cc52 100644 --- a/tests/classes/SocketServerMock.php +++ b/tests/SocketServerMock.php @@ -5,7 +5,9 @@ * @package Phrity > Net > Stream */ -namespace Phrity\Net; +namespace Phrity\Net\Test; + +use Phrity\Net\SocketServer; /** * Net\SocketServerMock class. diff --git a/tests/SocketServerTest.php b/tests/SocketServerTest.php index c7110ed..29db023 100644 --- a/tests/SocketServerTest.php +++ b/tests/SocketServerTest.php @@ -7,10 +7,15 @@ declare(strict_types=1); -namespace Phrity\Net; +namespace Phrity\Net\Test; use PHPUnit\Framework\TestCase; -use RuntimeException; +use Phrity\Net\{ + SocketServer, + StreamException, + Uri +}; +use Phrity\Net\Test\SocketServerMock; class SocketServerTest extends TestCase { @@ -37,6 +42,7 @@ public function testNonBlockingTcpServer(): void 'mode' => 'r+', 'unread_bytes' => 0, 'seekable' => false, + 'uri' => 'tcp://0.0.0.0:8000', ], $server->getMetadata()); $stream = $server->accept(0); $this->assertNull($stream); // Non-blocking, nothing to accept @@ -58,6 +64,7 @@ public function testNonBlockingUnixServer(): void 'mode' => 'r+', 'unread_bytes' => 0, 'seekable' => false, + 'uri' => 'unix:///tmp/test.sock', ], $server->getMetadata()); $stream = $server->accept(0); $this->assertNull($stream); // Non-blocking, nothing to accept @@ -67,24 +74,27 @@ public function testNonBlockingUnixServer(): void public function testUnsupportedScheme(): void { $uri = new Uri('http://0.0.0.0:8000'); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Scheme 'http' is not supported."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::SCHEME_TRANSPORT); + $this->expectExceptionMessage('Scheme "http" is not supported.'); $server = new SocketServer($uri); } public function testUnknownScheme(): void { $uri = new Uri('fake://0.0.0.0:8000'); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Could not handle scheme 'fake'."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::SCHEME_HANDLER); + $this->expectExceptionMessage('Could not handle scheme "fake".'); $server = new SocketServerMock($uri); } public function testCreateFailure(): void { $uri = new Uri('tcp://0.0.0.0'); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Could not create socket for 'tcp://0.0.0.0'."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::SERVER_SOCKET_ERR); + $this->expectExceptionMessage('Could not create socket for "tcp://0.0.0.0".'); $server = new SocketServer($uri); } @@ -100,9 +110,11 @@ public function testBlockingServerTimeout(): void 'mode' => 'r+', 'unread_bytes' => 0, 'seekable' => false, + 'uri' => 'tcp://0.0.0.0:8000', ], $server->getMetadata()); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Could not accept on socket."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::SERVER_ACCEPT_ERR); + $this->expectExceptionMessage('Could not accept on socket.'); $stream = $server->accept(0); $server->close(); } @@ -112,8 +124,9 @@ public function testAcceptOnClosedError(): void $uri = new Uri('tcp://0.0.0.0:8000'); $server = new SocketServer($uri); $server->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Server is closed."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::SERVER_CLOSED); + $this->expectExceptionMessage('Server is closed.'); $server->accept(); } @@ -122,8 +135,9 @@ public function testSetBlockingOnClosedError(): void $uri = new Uri('tcp://0.0.0.0:8000'); $server = new SocketServer($uri); $server->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Server is closed."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::SERVER_CLOSED); + $this->expectExceptionMessage('Server is closed.'); $server->setBlocking(true); } } diff --git a/tests/SocketStreamTest.php b/tests/SocketStreamTest.php index 7cab084..656f373 100644 --- a/tests/SocketStreamTest.php +++ b/tests/SocketStreamTest.php @@ -7,10 +7,14 @@ declare(strict_types=1); -namespace Phrity\Net; +namespace Phrity\Net\Test; use PHPUnit\Framework\TestCase; -use RuntimeException; +use Phrity\Net\{ + SocketStream, + StreamFactory, + StreamException +}; class SocketStreamTest extends TestCase { @@ -20,11 +24,16 @@ public function testTempStream(): void $resource = fopen(__DIR__ . '/fixtures/stream.txt', 'r+'); $stream = $factory->createSocketStreamFromResource($resource); + $this->assertTrue($stream->isConnected()); $this->assertEquals('', $stream->getRemoteName()); + $this->assertEquals('', $stream->getLocalName()); + $this->assertEquals('stream', $stream->getResourceType()); $this->assertTrue($stream->isBlocking()); $this->assertTrue($stream->setBlocking(false)); $this->assertFalse($stream->isBlocking()); + + $this->assertFalse($stream->setTimeout(1, 2)); } public function testSetBlockingOnClosed(): void @@ -34,8 +43,53 @@ public function testSetBlockingOnClosed(): void $stream = $factory->createSocketStreamFromResource($resource); $stream->close(); $this->assertNull($stream->isBlocking()); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is detached."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); $stream->setBlocking(false); } + + public function testSetTimeoutOnClosed(): void + { + $factory = new StreamFactory(); + $resource = fopen(__DIR__ . '/fixtures/stream.txt', 'r+'); + $stream = $factory->createSocketStreamFromResource($resource); + $stream->close(); + $this->assertNull($stream->isBlocking()); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); + $stream->setTimeout(1, 2); + } + + public function testReadLine(): void + { + $factory = new StreamFactory(); + $resource = fopen(__DIR__ . '/fixtures/stream-readonly.txt', 'r'); + $stream = $factory->createSocketStreamFromResource($resource); + $this->assertEquals('Test case for streams.', $stream->readLine(1024)); + } + + public function testReadLineOnClosed(): void + { + $factory = new StreamFactory(); + $resource = fopen(__DIR__ . '/fixtures/stream-readonly.txt', 'r'); + $stream = $factory->createSocketStreamFromResource($resource); + $stream->close(); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); + $stream->readLine(1024); + } + + public function testWriteOnlyReadLineError(): void + { + $factory = new StreamFactory(); + $resource = fopen(__DIR__ . '/fixtures/stream-writeonly.txt', 'w'); + $stream = $factory->createSocketStreamFromResource($resource); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::NOT_READABLE); + $this->expectExceptionMessage('Stream is not readable.'); + $stream->readLine(1024); + } } diff --git a/tests/StreamCollectionTest.php b/tests/StreamCollectionTest.php index 32951dc..9712cfe 100644 --- a/tests/StreamCollectionTest.php +++ b/tests/StreamCollectionTest.php @@ -7,10 +7,16 @@ declare(strict_types=1); -namespace Phrity\Net; +namespace Phrity\Net\Test; use PHPUnit\Framework\TestCase; -use RuntimeException; +use Phrity\Net\{ + SocketServer, + SocketStream, + StreamCollection, + StreamException +}; +use Phrity\Net\Uri; use TypeError; class StreamCollectionTest extends TestCase @@ -32,17 +38,15 @@ public function testCollection(): void } $readable = $collection->getReadable(); - $this->assertCount(1, $readable); + $this->assertCount(2, $readable); foreach ($readable as $key => $item) { - $this->assertSame('@stream', $key); - $this->assertSame($stream, $item); + $this->assertSame($key == '@stream' ? $stream : $server, $item); } $writable = $collection->getWritable(); - $this->assertCount(1, $writable); + $this->assertCount(2, $writable); foreach ($writable as $key => $item) { - $this->assertSame('@stream', $key); - $this->assertSame($stream, $item); + $this->assertSame($key == '@stream' ? $stream : $server, $item); } $changed = $collection->waitRead(10); // Should not block @@ -70,7 +74,9 @@ public function testAttachError(): void $stream = new SocketStream($resource); $collection = new StreamCollection(); $collection->attach($stream, 'my-key'); - $this->expectException(RuntimeException::class); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::COLLECT_KEY_CONFLICT); + $this->expectExceptionMessage('Stream with name "my-key" already attached.'); $collection->attach($stream, 'my-key'); } @@ -80,4 +86,11 @@ public function testDetachhError(): void $this->expectException(TypeError::class); $collection->detach(1); } + + public function testEmptyCollection(): void + { + $collection = new StreamCollection(); + $changed = $collection->waitRead(10); // Should not block + $this->assertEmpty($changed); + } } diff --git a/tests/StreamFactoryTest.php b/tests/StreamFactoryTest.php index 947eddd..c706ec0 100644 --- a/tests/StreamFactoryTest.php +++ b/tests/StreamFactoryTest.php @@ -7,10 +7,18 @@ declare(strict_types=1); -namespace Phrity\Net; +namespace Phrity\Net\Test; use InvalidArgumentException; use PHPUnit\Framework\TestCase; +use Phrity\Net\{ + SocketClient, + SocketServer, + SocketStream, + Stream, + StreamCollection, + StreamFactory, +}; use Phrity\Net\Uri; use Psr\Http\Message\{ StreamFactoryInterface, @@ -146,6 +154,14 @@ public function testCreateSocketServer(): void $server->close(); } + public function testCreateSocketClient(): void + { + $url = new Uri('tcp://0.0.0.0:8000'); + $factory = new StreamFactory(); + $client = $factory->createSocketClient($url); + $this->assertInstanceOf(SocketClient::class, $client); + } + public function testCreateStreamCollection(): void { $factory = new StreamFactory(); diff --git a/tests/StreamTest.php b/tests/StreamTest.php index 466fc10..f390b91 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -7,10 +7,15 @@ declare(strict_types=1); -namespace Phrity\Net; +namespace Phrity\Net\Test; use InvalidArgumentException; use PHPUnit\Framework\TestCase; +use Phrity\Net\{ + Stream, + StreamFactory, + StreamException +}; use Phrity\Util\ErrorHandler; use Psr\Http\Message\StreamInterface; use RuntimeException; @@ -117,8 +122,9 @@ public function testSeekFailure(): void { $factory = new StreamFactory(); $stream = $factory->createStream('This is a temporary test stream'); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Failed to seek."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::FAIL_SEEK); + $this->expectExceptionMessage('Failed seek() on stream.'); $stream->seek(-1); $stream->close(); } @@ -129,8 +135,9 @@ public function testTellOnClosed(): void $stream = $factory->createStream('This is a temporary test stream'); $stream->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is detached."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); $stream->tell(); $stream->close(); } @@ -141,8 +148,9 @@ public function testReadOnClosed(): void $stream = $factory->createStream('This is a temporary test stream'); $stream->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is detached."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); $stream->read(4); $stream->close(); } @@ -153,8 +161,9 @@ public function testWriteOnClosed(): void $stream = $factory->createStream('This is a temporary test stream'); $stream->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is detached."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); $stream->write("Will fail"); $stream->close(); } @@ -165,8 +174,9 @@ public function testSeekOnClosed(): void $stream = $factory->createStream('This is a temporary test stream'); $stream->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is detached."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); $stream->seek(0); $stream->close(); } @@ -177,8 +187,9 @@ public function testRewindOnClosed(): void $stream = $factory->createStream('This is a temporary test stream'); $stream->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is detached."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); $stream->rewind(); $stream->close(); } @@ -189,8 +200,9 @@ public function testGetContentsOnClosed(): void $stream = $factory->createStream('This is a temporary test stream'); $stream->close(); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is detached."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::STREAM_DETACHED); + $this->expectExceptionMessage('Stream is detached.'); $stream->getContents(); $stream->close(); } @@ -233,8 +245,9 @@ public function testReadOnlyWriteError(): void $file = __DIR__ . '/fixtures/stream-readonly.txt'; $stream = $factory->createStreamFromFile($file, 'r'); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is not writable."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::NOT_WRITABLE); + $this->expectExceptionMessage('Stream is not writable.'); $stream->write("Should fail"); $stream->close(); } @@ -277,8 +290,9 @@ public function testWriteOnlyReadError(): void $file = __DIR__ . '/fixtures/stream-writeonly.txt'; $stream = $factory->createStreamFromFile($file, 'w'); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is not readable."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::NOT_READABLE); + $this->expectExceptionMessage('Stream is not readable.'); $stream->read(10); $stream->close(); } @@ -289,8 +303,9 @@ public function testWriteOnlyGetContentsError(): void $file = __DIR__ . '/fixtures/stream-writeonly.txt'; $stream = $factory->createStreamFromFile($file, 'w'); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is not readable."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::NOT_READABLE); + $this->expectExceptionMessage('Stream is not readable.'); $stream->getContents(); $stream->close(); } @@ -347,8 +362,9 @@ public function testSeekOnRemoteError(): void { $remote = fopen('https://phrity.sirn.se/', 'r'); $stream = new Stream($remote); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage("Stream is not seekable."); + $this->expectException(StreamException::class); + $this->expectExceptionCode(StreamException::NOT_SEEKABLE); + $this->expectExceptionMessage('Stream is not seekable.'); $stream->seek(0); $stream->close(); }