Skip to content

Commit

Permalink
Add support for connect timeout and improve connection callbacks.
Browse files Browse the repository at this point in the history
Connection callbacks for connect and error events are now handled in a
different manner. This is mostly likely not going to be the final design
for connection callbacks and the code is still ugly.
  • Loading branch information
nrk committed May 25, 2012
1 parent 844cfcd commit 1c5eeb1
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 6 deletions.
1 change: 0 additions & 1 deletion README.md
Expand Up @@ -65,7 +65,6 @@ server profiles or define commands with their own arguments filter / reply parse
## Most immediate TODO list ##

- Provide a better API that suits best the asynchronous model.
- Add connection timeouts (we actually need timers to get into React\EventLoop first).
- Add tests. Lots of them.
- Everything else.

Expand Down
37 changes: 34 additions & 3 deletions lib/Predis/Async/Client.php
Expand Up @@ -124,16 +124,47 @@ protected function initializeConnection($parameters, ClientOptionsInterface $opt
$connection = new AsynchronousConnection($parameters ?: self::DEFAULT_SERVER, $this->getEventLoop());

if (isset($options->on_connect)) {
$connection->setConnectCallback($options->on_connect);
$this->setConnectCallback($connection, $options->on_connect);
}

if (isset($options->on_error)) {
$connection->setErrorCallback($options->on_error);
$this->setErrorCallback($connection, $options->on_error);
}
}

return $connection;
}

/**
* Sets the callback used to notify the client after a successful connect operation.
*
* @param AsynchronousConnectionInterface $connection Connection instance.
* @param mixed $callback Callback for connection event.
*/
protected function setConnectCallback(AsynchronousConnectionInterface $connection, $callback)
{
$client = $this;

$connection->setConnectCallback(function ($connection) use ($callback, $client) {
call_user_func($callback, $client, $connection);
});
}

/**
* Sets the callback used to notify the client after a connection error.
*
* @param AsynchronousConnectionInterface $connection Connection instance.
* @param mixed $callback Callback for error event.
*/
protected function setErrorCallback(AsynchronousConnectionInterface $connection, $callback)
{
$client = $this;

$connection->setErrorCallback(function ($connection, $exception) use ($callback, $client) {
call_user_func($callback, $client, $exception, $connection);
});
}

/**
* Returns the server profile used by the client.
*
Expand Down Expand Up @@ -172,7 +203,7 @@ public function getOptions()
public function connect($callback)
{
if (isset($callback)) {
$this->connection->setConnectCallback($callback);
$this->setConnectCallback($this->getConnection(), $callback);
}

$this->connection->connect();
Expand Down
21 changes: 19 additions & 2 deletions lib/Predis/Async/Connection/AsynchronousConnection.php
Expand Up @@ -31,6 +31,7 @@ class AsynchronousConnection implements AsynchronousConnectionInterface
protected $buffer;
protected $cmdqueue;
protected $state;
protected $timeout = null;
protected $stateCbk = null;
protected $onError = null;
protected $onConnect = null;
Expand Down Expand Up @@ -96,9 +97,21 @@ protected function createResource()
}

stream_set_blocking($socket, 0);

$this->setState('CONNECTING');

$timeout = $this->parameters->timeout;
$callbackArgs = array($this, $this->onError);

$this->eventloop->addWriteStream($socket, array($this, 'onConnect'));
$this->timeout = $this->eventloop->addTimer($timeout, function ($timer, $loop) use ($callbackArgs) {
list($connection, $onError) = $callbackArgs;

$connection->disconnect();

if (isset($onError)) {
call_user_func($onError, $connection, new ConnectionException($connection, 'Connection timed out'));
}
});

return $socket;
}
Expand Down Expand Up @@ -169,6 +182,9 @@ public function onConnect()
$socket = $this->getResource();
$this->setState('READY');

$this->eventloop->cancelTimer($this->timeout);
$this->timeout = null;

$this->eventloop->removeWriteStream($socket);
$this->eventloop->addReadStream($socket, $this->cbkStreamReadable);

Expand Down Expand Up @@ -197,8 +213,9 @@ public function setErrorCallback($callback) {
*/
protected function onError(\Exception $exception)
{
$this->disconnect();

if (isset($this->onError)) {
$this->disconnect();
call_user_func($this->onError, $this, $exception);
}
}
Expand Down

0 comments on commit 1c5eeb1

Please sign in to comment.