Skip to content

Commit

Permalink
Implemented PSR-7 compatible stream abstraction (#1450)
Browse files Browse the repository at this point in the history
* Implemented PSR-7 compatible stream abstraction

* Added missing coverage, fixing static analysis issues

* Fixed return type

* Added missing throws statements

* Added ignore lines
  • Loading branch information
vladvildanov committed Apr 10, 2024
1 parent 40d72d7 commit c83ee6a
Show file tree
Hide file tree
Showing 20 changed files with 1,835 additions and 410 deletions.
3 changes: 2 additions & 1 deletion composer.json
Expand Up @@ -22,7 +22,8 @@
}
],
"require": {
"php": "^7.2 || ^8.0"
"php": "^7.2 || ^8.0",
"psr/http-message": "^1.0|^2.0"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.3",
Expand Down
4 changes: 2 additions & 2 deletions examples/sharded_dispatcher_loop.php
Expand Up @@ -35,8 +35,8 @@
'tcp://127.0.0.1:6373?read_write_timeout=0',
'tcp://127.0.0.1:6374?read_write_timeout=0',
], [
'cluster' => 'redis',
]);
'cluster' => 'redis',
]);

// 2. Run pub/sub loop.
$pubSub = $client->pubSubLoop();
Expand Down
4 changes: 2 additions & 2 deletions examples/sharded_pubsub_consumer.php
Expand Up @@ -21,8 +21,8 @@
'tcp://127.0.0.1:6373?read_write_timeout=0',
'tcp://127.0.0.1:6374?read_write_timeout=0',
], [
'cluster' => 'redis',
]);
'cluster' => 'redis',
]);

// 2. Run pub/sub loop. Sharded channels belongs to different shards.
$pubSub = $client->pubSubLoop();
Expand Down
2 changes: 1 addition & 1 deletion examples/transaction_using_cas.php
Expand Up @@ -32,7 +32,7 @@ function zpop($client, $key)
'cas' => true, // Initialize with support for CAS operations
'watch' => $key, // Key that needs to be WATCHed to detect changes
'retry' => 3, // Number of retries on aborted transactions, after
// which the client bails out with an exception.
// which the client bails out with an exception.
];

$client->transaction($options, function ($tx) use ($key, &$element) {
Expand Down
41 changes: 17 additions & 24 deletions src/Connection/AbstractConnection.php
Expand Up @@ -12,10 +12,10 @@

namespace Predis\Connection;

use InvalidArgumentException;
use Predis\Command\CommandInterface;
use Predis\Command\RawCommand;
use Predis\CommunicationException;
use Predis\Connection\Resource\Exception\StreamInitException;
use Predis\Protocol\Parser\ParserStrategyResolver;
use Predis\Protocol\Parser\Strategy\ParserStrategyInterface;
use Predis\Protocol\ProtocolException;
Expand All @@ -36,7 +36,7 @@ abstract class AbstractConnection implements NodeConnectionInterface
*/
protected $clientId;

private $resource;
protected $resource;
private $cachedId;

protected $parameters;
Expand All @@ -51,7 +51,7 @@ abstract class AbstractConnection implements NodeConnectionInterface
*/
public function __construct(ParametersInterface $parameters)
{
$this->parameters = $this->assertParameters($parameters);
$this->parameters = $parameters;
$this->setParserStrategy();
}

Expand All @@ -64,23 +64,6 @@ public function __destruct()
$this->disconnect();
}

/**
* Checks some of the parameters used to initialize the connection.
*
* @param ParametersInterface $parameters Initialization parameters for the connection.
*
* @return ParametersInterface
* @throws InvalidArgumentException
*/
abstract protected function assertParameters(ParametersInterface $parameters);

/**
* Creates the underlying resource used to communicate with Redis.
*
* @return mixed
*/
abstract protected function createResource();

/**
* {@inheritdoc}
*/
Expand All @@ -97,6 +80,14 @@ public function hasDataToRead(): bool
return true;
}

/**
* Creates a stream resource to communicate with Redis.
*
* @return mixed
* @throws StreamInitException
*/
abstract protected function createResource();

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -156,10 +147,11 @@ public function readResponse(CommandInterface $command)
/**
* Helper method to handle connection errors.
*
* @param string $message Error message.
* @param int $code Error code.
* @param string $message Error message.
* @param int $code Error code.
* @throws CommunicationException
*/
protected function onConnectionError($message, $code = 0)
protected function onConnectionError($message, $code = 0): void
{
CommunicationException::handle(
new ConnectionException($this, "$message [{$this->getParameters()}]", $code)
Expand All @@ -169,7 +161,8 @@ protected function onConnectionError($message, $code = 0)
/**
* Helper method to handle protocol errors.
*
* @param string $message Error message.
* @param string $message Error message.
* @throws CommunicationException
*/
protected function onProtocolError($message)
{
Expand Down
44 changes: 28 additions & 16 deletions src/Connection/CompositeStreamConnection.php
Expand Up @@ -16,24 +16,28 @@
use Predis\Command\CommandInterface;
use Predis\Protocol\ProtocolProcessorInterface;
use Predis\Protocol\Text\ProtocolProcessor as TextProtocolProcessor;
use Psr\Http\Message\StreamInterface;
use RuntimeException;

/**
* Connection abstraction to Redis servers based on PHP's stream that uses an
* external protocol processor defining the protocol used for the communication.
*
* @method StreamInterface getResource()
*/
class CompositeStreamConnection extends StreamConnection implements CompositeConnectionInterface
{
protected $protocol;

/**
* @param ParametersInterface $parameters Initialization parameters for the connection.
* @param ProtocolProcessorInterface $protocol Protocol processor.
* @param ParametersInterface $parameters Initialization parameters for the connection.
* @param ProtocolProcessorInterface|null $protocol Protocol processor.
*/
public function __construct(
ParametersInterface $parameters,
?ProtocolProcessorInterface $protocol = null
) {
$this->parameters = $this->assertParameters($parameters);
parent::__construct($parameters);
$this->protocol = $protocol ?: new TextProtocolProcessor();
}

Expand Down Expand Up @@ -63,17 +67,21 @@ public function readBuffer($length)
}

$value = '';
$socket = $this->getResource();
$stream = $this->getResource();

do {
$chunk = fread($socket, $length);
if ($stream->eof()) {
$this->onStreamError(new RuntimeException('Stream is already at the end'), '');
}

if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading bytes from the server.');
do {
try {
$chunk = $stream->read($length);
} catch (RuntimeException $e) {
$this->onStreamError($e, 'Error while reading bytes from the server.');
}

$value .= $chunk;
} while (($length -= strlen($chunk)) > 0);
$value .= $chunk; // @phpstan-ignore-line
} while (($length -= strlen($chunk)) > 0); // @phpstan-ignore-line

return $value;
}
Expand All @@ -84,16 +92,20 @@ public function readBuffer($length)
public function readLine()
{
$value = '';
$socket = $this->getResource();
$stream = $this->getResource();

do {
$chunk = fgets($socket);
if ($stream->eof()) {
$this->onStreamError(new RuntimeException('Stream is already at the end'), '');
}

if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading line from the server.');
do {
try {
$chunk = $stream->read(-1);
} catch (RuntimeException $e) {
$this->onStreamError($e, 'Error while reading bytes from the server.');
}

$value .= $chunk;
$value .= $chunk; // @phpstan-ignore-line
} while (substr($value, -2) !== "\r\n");

return substr($value, 0, -2);
Expand Down
19 changes: 19 additions & 0 deletions src/Connection/Resource/Exception/StreamInitException.php
@@ -0,0 +1,19 @@
<?php

/*
* This file is part of the Predis package.
*
* (c) 2009-2020 Daniele Alessandri
* (c) 2021-2023 Till Krüss
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Predis\Connection\Resource\Exception;

use Predis\PredisException;

class StreamInitException extends PredisException
{
}

0 comments on commit c83ee6a

Please sign in to comment.