Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix channel wait timeouts and endless loops #642

Merged
merged 1 commit into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions PhpAmqpLib/Channel/AbstractChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPNotImplementedException;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
Expand Down Expand Up @@ -213,7 +214,7 @@ public function dispatch($method_sig, $args, $amqpMessage)
}

/**
* @param int $timeout
* @param int|float|null $timeout
* @return array|mixed
*/
public function next_frame($timeout = 0)
Expand Down Expand Up @@ -318,7 +319,7 @@ protected function createMessage($propertyReader, $contentReader)
*
* @param array $allowed_methods
* @param bool $non_blocking
* @param int $timeout
* @param int|float|null $timeout
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
Expand All @@ -334,9 +335,19 @@ public function wait($allowed_methods = null, $non_blocking = false, $timeout =
return $this->dispatch_deferred_method($deferred['queued_method']);
}

// timeouts must be deactivated for non-blocking actions
if (true === $non_blocking) {
$timeout = null;
}

// No deferred methods? wait for new ones
while (true) {
list($frame_type, $payload) = $this->next_frame($timeout);
try {
list($frame_type, $payload) = $this->next_frame($timeout);
} catch (AMQPNoDataException $e) {
// no data ready for non-blocking actions - stop and exit
break;
}

$this->validate_method_frame($frame_type);
$this->validate_frame_payload($payload);
Expand Down
19 changes: 12 additions & 7 deletions PhpAmqpLib/Connection/AbstractConnection.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
<?php
namespace PhpAmqpLib\Connection;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
Expand Down Expand Up @@ -260,6 +261,8 @@ protected function connect()
// Something went wrong, set the connection status
$this->setIsConnected(false);
$this->closeChannels();
$this->close_input();
$this->close_socket();
throw $e; // Rethrow exception
}
}
Expand Down Expand Up @@ -330,7 +333,7 @@ public function set_close_on_destruct($close = true)

protected function close_input()
{
$this->debug->debug_msg('closing input');
$this->debug && $this->debug->debug_msg('closing input');

if (!is_null($this->input)) {
$this->input->close();
Expand All @@ -340,7 +343,7 @@ protected function close_input()

protected function close_socket()
{
$this->debug->debug_msg('closing socket');
$this->debug && $this->debug->debug_msg('closing socket');

if (!is_null($this->getIO())) {
$this->getIO()->close();
Expand Down Expand Up @@ -516,7 +519,7 @@ protected function prepare_channel_method_frame($channel, $method_sig, $args = '
/**
* Waits for a frame from the server
*
* @param int $timeout
* @param int|float|null $timeout
* @return array
* @throws \Exception
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
Expand Down Expand Up @@ -556,6 +559,9 @@ protected function wait_frame($timeout = 0)
} catch (AMQPTimeoutException $e) {
$this->input->setTimeout($currentTimeout);
throw $e;
} catch (AMQPNoDataException $e) {
$this->input->setTimeout($currentTimeout);
throw $e;
}

$this->input->setTimeout($currentTimeout);
Expand All @@ -574,7 +580,7 @@ protected function wait_frame($timeout = 0)
* Waits for a frame from the server destined for a particular channel.
*
* @param string $channel_id
* @param int $timeout
* @param int|float|null $timeout
* @return array
*/
protected function wait_channel($channel_id, $timeout = 0)
Expand All @@ -585,8 +591,7 @@ protected function wait_channel($channel_id, $timeout = 0)
$now = time();
try {
list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
}
catch ( AMQPTimeoutException $e ) {
} catch (AMQPTimeoutException $e) {
if ( $this->heartbeat && microtime(true) - ($this->heartbeat*2) > $this->last_frame ) {
$this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
$this->setIsConnected(false);
Expand Down
10 changes: 10 additions & 0 deletions PhpAmqpLib/Exception/AMQPNoDataException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace PhpAmqpLib\Exception;

/**
* Used mostly in non-blocking methods when no data is ready for processing.
*/
class AMQPNoDataException extends AMQPRuntimeException
{
}
4 changes: 2 additions & 2 deletions PhpAmqpLib/Helper/MiscHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public static function saveBytes($bytes)
* different arguments
*
* @param int|float $number
* @return array
* @return int[]
*/
public static function splitSecondsMicroseconds($number)
{
return array(floor($number), ($number - floor($number)) * 1000000);
return array((int)floor($number), (int)(fmod($number, 1) * 1000000));
}

/**
Expand Down
42 changes: 28 additions & 14 deletions PhpAmqpLib/Wire/AMQPReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use PhpAmqpLib\Exception\AMQPDataReadException;
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Exception\AMQPIOWaitException;
Expand Down Expand Up @@ -41,7 +42,7 @@ class AMQPReader extends AbstractClient
/** @var bool */
protected $is64bits;

/** @var int */
/** @var int|float|null */
protected $timeout;

/** @var int */
Expand Down Expand Up @@ -111,28 +112,40 @@ public function read($n)
*
* AMQPTimeoutException can be raised if the timeout is set
*
* @throws \PhpAmqpLib\Exception\AMQPIOWaitException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws \PhpAmqpLib\Exception\AMQPIOWaitException on network errors
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException when timeout is set and no data received
* @throws \PhpAmqpLib\Exception\AMQPNoDataException when no data is ready to read from IO
*/
protected function wait()
{
if ($this->getTimeout() == 0) {
return null;
$timeout = $this->getTimeout();
if (null === $timeout) {
// timeout=null just poll state and return instantly
$sec = 0;
$usec = 0;
} elseif ($timeout > 0) {
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
} else {
// wait indefinitely for data if timeout=0
$sec = null;
$usec = 0;
}

// wait ..
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
$result = $this->io->select($sec, $usec);

if ($result === false) {
throw new AMQPIOWaitException('A network error occured while awaiting for incoming data');
throw new AMQPIOWaitException('A network error occurred while awaiting for incoming data');
}

if ($result === 0) {
throw new AMQPTimeoutException(sprintf(
'The connection timed out after %s sec while awaiting incoming data',
$this->getTimeout()
));
if ($timeout > 0) {
throw new AMQPTimeoutException(sprintf(
'The connection timed out after %s sec while awaiting incoming data',
$timeout
));
} else {
throw new AMQPNoDataException('No data is ready to read');
}
}
}

Expand All @@ -141,6 +154,7 @@ protected function wait()
* @return string
* @throws \RuntimeException
* @throws \PhpAmqpLib\Exception\AMQPDataReadException
* @throws \PhpAmqpLib\Exception\AMQPNoDataException
*/
protected function rawread($n)
{
Expand Down Expand Up @@ -533,15 +547,15 @@ protected function tell()
/**
* Sets the timeout (second)
*
* @param int $timeout
* @param int|float|null $timeout
*/
public function setTimeout($timeout)
{
$this->timeout = $timeout;
}

/**
* @return int
* @return int|float
*/
public function getTimeout()
{
Expand Down
55 changes: 54 additions & 1 deletion PhpAmqpLib/Wire/IO/SocketIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPIOWaitException;
use PhpAmqpLib\Exception\AMQPSocketException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\AMQPWriter;
Expand Down Expand Up @@ -36,6 +37,9 @@ class SocketIO extends AbstractIO
/** @var bool */
private $keepalive;

/** @var array */
private $last_error;

/**
* @param string $host
* @param int $port
Expand Down Expand Up @@ -205,8 +209,17 @@ public function select($sec, $usec)
$read = array($this->sock);
$write = null;
$except = null;
$result = false;

$this->set_error_handler();
try {
$result = socket_select($read, $write, $except, $sec, $usec);
$this->cleanup_error_handler();
} catch (\ErrorException $e) {
throw new AMQPIOWaitException($e->getMessage(), $e->getCode(), $e);
}

return socket_select($read, $write, $except, $sec, $usec);
return $result;
}

/**
Expand Down Expand Up @@ -258,4 +271,44 @@ protected function write_heartbeat()
$pkt->write_octet(0xCE);
$this->write($pkt->getvalue());
}

public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
{
// socket_select warning that it has been interrupted by a signal - EINTR
if (false !== strrpos($errstr, socket_strerror(SOCKET_EINTR))) {
// it's allowed while processing signals
return;
}

$this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
}

/**
* Begin tracking errors and set the error handler
*/
protected function set_error_handler()
{
$this->last_error = null;
socket_clear_error($this->sock);
set_error_handler(array($this, 'error_handler'));
}

/**
* throws an ErrorException if an error was handled
* @throws \ErrorException
*/
protected function cleanup_error_handler()
{
restore_error_handler();

if ($this->last_error !== null) {
throw new \ErrorException(
$this->last_error['errstr'],
0,
$this->last_error['errno'],
$this->last_error['errfile'],
$this->last_error['errline']
);
}
}
}
11 changes: 3 additions & 8 deletions demo/amqp_consumer_non_blocking.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,7 @@ function shutdown($channel, $connection)

// Loop as long as the channel has callbacks registered
while (count($channel->callbacks)) {
$read = array($connection->getSocket()); // add here other sockets that you need to attend
$write = null;
$except = null;
if (false === ($changeStreamsCount = stream_select($read, $write, $except, 60))) {
/* Error handling */
} elseif ($changeStreamsCount > 0 || $channel->hasPendingMethods()) {
$channel->wait();
}
$channel->wait(null, true);
// do something else
usleep(300000);
}
Loading