diff --git a/README.md b/README.md index dab6337..d7ab10d 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,6 @@ server profiles or define commands with their own arguments filter / reply parse ## Most immediate TODO list ## - Provide a better API that suits best the asynchronous model. -- Add connection timeouts (we actually need timers to get into React\EventLoop first). - Add tests. Lots of them. - Everything else. diff --git a/lib/Predis/Async/Client.php b/lib/Predis/Async/Client.php index 6461b1f..fb8bd31 100644 --- a/lib/Predis/Async/Client.php +++ b/lib/Predis/Async/Client.php @@ -124,16 +124,47 @@ protected function initializeConnection($parameters, ClientOptionsInterface $opt $connection = new AsynchronousConnection($parameters ?: self::DEFAULT_SERVER, $this->getEventLoop()); if (isset($options->on_connect)) { - $connection->setConnectCallback($options->on_connect); + $this->setConnectCallback($connection, $options->on_connect); } + if (isset($options->on_error)) { - $connection->setErrorCallback($options->on_error); + $this->setErrorCallback($connection, $options->on_error); } } return $connection; } + /** + * Sets the callback used to notify the client after a successful connect operation. + * + * @param AsynchronousConnectionInterface $connection Connection instance. + * @param mixed $callback Callback for connection event. + */ + protected function setConnectCallback(AsynchronousConnectionInterface $connection, $callback) + { + $client = $this; + + $connection->setConnectCallback(function ($connection) use ($callback, $client) { + call_user_func($callback, $client, $connection); + }); + } + + /** + * Sets the callback used to notify the client after a connection error. + * + * @param AsynchronousConnectionInterface $connection Connection instance. + * @param mixed $callback Callback for error event. + */ + protected function setErrorCallback(AsynchronousConnectionInterface $connection, $callback) + { + $client = $this; + + $connection->setErrorCallback(function ($connection, $exception) use ($callback, $client) { + call_user_func($callback, $client, $exception, $connection); + }); + } + /** * Returns the server profile used by the client. * @@ -172,7 +203,7 @@ public function getOptions() public function connect($callback) { if (isset($callback)) { - $this->connection->setConnectCallback($callback); + $this->setConnectCallback($this->getConnection(), $callback); } $this->connection->connect(); diff --git a/lib/Predis/Async/Connection/AsynchronousConnection.php b/lib/Predis/Async/Connection/AsynchronousConnection.php index 2ef5536..6530ba9 100644 --- a/lib/Predis/Async/Connection/AsynchronousConnection.php +++ b/lib/Predis/Async/Connection/AsynchronousConnection.php @@ -31,6 +31,7 @@ class AsynchronousConnection implements AsynchronousConnectionInterface protected $buffer; protected $cmdqueue; protected $state; + protected $timeout = null; protected $stateCbk = null; protected $onError = null; protected $onConnect = null; @@ -96,9 +97,21 @@ protected function createResource() } stream_set_blocking($socket, 0); - $this->setState('CONNECTING'); + + $timeout = $this->parameters->timeout; + $callbackArgs = array($this, $this->onError); + $this->eventloop->addWriteStream($socket, array($this, 'onConnect')); + $this->timeout = $this->eventloop->addTimer($timeout, function ($timer, $loop) use ($callbackArgs) { + list($connection, $onError) = $callbackArgs; + + $connection->disconnect(); + + if (isset($onError)) { + call_user_func($onError, $connection, new ConnectionException($connection, 'Connection timed out')); + } + }); return $socket; } @@ -169,6 +182,9 @@ public function onConnect() $socket = $this->getResource(); $this->setState('READY'); + $this->eventloop->cancelTimer($this->timeout); + $this->timeout = null; + $this->eventloop->removeWriteStream($socket); $this->eventloop->addReadStream($socket, $this->cbkStreamReadable); @@ -197,8 +213,9 @@ public function setErrorCallback($callback) { */ protected function onError(\Exception $exception) { + $this->disconnect(); + if (isset($this->onError)) { - $this->disconnect(); call_user_func($this->onError, $this, $exception); } }