Skip to content

Commit

Permalink
Heartbeat tests (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
TomK committed Jul 12, 2021
1 parent c7b9c26 commit d533348
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 27 deletions.
91 changes: 68 additions & 23 deletions src/Provider/Amqp/AmqpQueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public function consume(callable $callback)
}
catch(AMQPHeartbeatMissedException $e)
{
$this->_disconnect(self::CONN_CONSUME);
$this->disconnect(self::CONN_CONSUME);
$retry = true;
}
catch(AMQPProtocolChannelException $e)
Expand Down Expand Up @@ -300,10 +300,17 @@ public function consume(callable $callback)
// replace callback for this consumer
$channel->callbacks[$consumerId] = $this->_fixedConsumerCallback;
}

// consumers bound, wait for message
try
{
$channel->wait(null, false, $this->_getWaitTime());
}
catch(AMQPHeartbeatMissedException $e)
{
$this->disconnect(self::CONN_CONSUME);
return false;
}
catch(AMQPTimeoutException $e)
{
return false;
Expand Down Expand Up @@ -399,14 +406,30 @@ public static function create(

public function ack($deliveryTag)
{
$this->_getChannel(self::CONN_CONSUME)
->basic_ack($deliveryTag, false);
try
{
$this->_getChannel(self::CONN_CONSUME)
->basic_ack($deliveryTag, false);
}
catch(AMQPHeartbeatMissedException $e)
{
$this->disconnect(self::CONN_CONSUME);
$this->ack($deliveryTag);
}
}

public function nack($deliveryTag, $requeueFailures = false)
{
$this->_getChannel(self::CONN_CONSUME)
->basic_reject($deliveryTag, $requeueFailures);
try
{
$this->_getChannel(self::CONN_CONSUME)
->basic_reject($deliveryTag, $requeueFailures);
}
catch(AMQPHeartbeatMissedException $e)
{
$this->disconnect(self::CONN_CONSUME);
$this->nack($deliveryTag, $requeueFailures);
}
}

public function batchAck(array $tagResults, $requeueFailures = false)
Expand Down Expand Up @@ -556,7 +579,7 @@ protected function _getConnection($connectionMode)
}
catch(Exception $e)
{
$this->_log('AMQP host failed to connect (' . $host . ')');
$this->_log('AMQP host failed to connect [' . $e->getMessage() . '] (' . $host . ')');
array_shift($this->_hosts);
}
$this->_persistentDefault = ValueAs::bool($config->getItem('persistent', false));
Expand All @@ -574,6 +597,7 @@ protected function _getConnection($connectionMode)
}
catch(AMQPRuntimeException $e)
{
$this->_log('Unable to start heartbeat sender. ' . $e->getMessage());
}

return $this->_connections[$connectionMode];
Expand Down Expand Up @@ -616,7 +640,7 @@ protected function _getChannel($connectionMode)
catch(Exception $e)
{
$this->_log(
'Error getting AMQP channel (' . $retries . ' retries remaining)'
'Error getting AMQP channel [' . $e->getMessage() . '] (' . $retries . ' retries remaining) '
);
$this->disconnect($connectionMode);
if(!($retries--))
Expand Down Expand Up @@ -659,36 +683,57 @@ public function disconnect($connectionMode = null)

private function _disconnect($connectionMode)
{
try
if((!empty($this->_channels[$connectionMode]))
&& ($this->_channels[$connectionMode] instanceof AMQPChannel)
)
{
if((!empty($this->_channels[$connectionMode]))
&& ($this->_channels[$connectionMode] instanceof AMQPChannel)
)
try
{
$this->_channels[$connectionMode]->wait_for_pending_acks_returns($this->_getPushTimeout());
}
catch(\Throwable $e)
{
}
try
{
$this->_channels[$connectionMode]->basic_cancel($this->_getConsumerId());
}
catch(\Throwable $e)
{
}
try
{
$this->_channels[$connectionMode]->close();
}
}
catch(Exception $e)
{
catch(\Throwable $e)
{
}
}
$this->_channels[$connectionMode] = null;

if($this->_heartbeatSender)
{
$this->_heartbeatSender->unregister();
$this->_heartbeatSender = null;
try
{
$this->_heartbeatSender->unregister();
}
catch(\Throwable $e)
{
}
}
try
$this->_heartbeatSender = null;

if((!empty($this->_connections[$connectionMode]))
&& ($this->_connections[$connectionMode] instanceof AbstractConnection)
)
{
if((!empty($this->_connections[$connectionMode]))
&& ($this->_connections[$connectionMode] instanceof AbstractConnection)
)
try
{
$this->_connections[$connectionMode]->close();
}
}
catch(Exception $e)
{
catch(\Throwable $e)
{
}
}
$this->_connections[$connectionMode] = null;
}
Expand Down
33 changes: 29 additions & 4 deletions tests/Provider/AmqpTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,42 @@

use Packaged\Config\ConfigSectionInterface;
use Packaged\Config\Provider\ConfigSection;
use Packaged\Queue\Provider\Amqp\AmqpQueueProvider;
use Packaged\Queue\Tests\Provider\Mock\AmqpMockProvider;

class AmqpTest extends \PHPUnit_Framework_TestCase
{
protected function _getProvider(string $queue, ?string $exchange = null)
{
$q = AmqpQueueProvider::create($queue, $exchange);
$q->configure(new ConfigSection('', ['heartbeat' => 2]));
$q = AmqpMockProvider::create($queue, $exchange);
$q->configure(new ConfigSection('', ['heartbeat' => 4, 'read_write_timeout' => 3]));
$q->deleteQueueAndExchange();
return $q;
}

public function testFailHeartbeat()
{
$q = $this->_getProvider('test_heartbeat')->unregisterHeartbeat();
$q->declareExchange()
->declareQueue()
->bindQueue();
$q->push($q->config()->getItem('heartbeat'));
self::assertEquals(0, $q->getDisconnectCount());
$q->consume(
function ($msg, $tag) use ($q) {

$timeLeft = (int)$msg * 3;
while($timeLeft > 0)
{
$timeLeft = sleep($timeLeft);
}

$q->ack($tag);
// expect one reconnect
self::assertEquals(1, $q->getDisconnectCount());
}
);
}

public function testAmqp()
{
$q = $this->_getProvider('test', 'testexchange');
Expand Down Expand Up @@ -207,7 +232,7 @@ public function testMandatory(
$config, $queueName, $createExchange, $createQueue, $createBinding
)
{
$q = $this->_getProvider($queueName)->deleteQueueAndExchange();
$q = $this->_getProvider($queueName);
$q->configure(new ConfigSection('', $config));

if($createExchange)
Expand Down
42 changes: 42 additions & 0 deletions tests/Provider/Mock/AmqpMockProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace Packaged\Queue\Tests\Provider\Mock;

use Packaged\Queue\Provider\Amqp\AmqpQueueProvider;

class AmqpMockProvider extends AmqpQueueProvider
{
protected $_disconnectCount = 0;
protected $_unregisterHeartbeat = false;

protected function _getConnection($connectionMode)
{
$conn = parent::_getConnection($connectionMode);
if($this->_unregisterHeartbeat && $this->_heartbeatSender)
{
$this->_heartbeatSender->unregister();
}
return $conn;
}

public function unregisterHeartbeat()
{
$this->_unregisterHeartbeat = true;
if($this->_unregisterHeartbeat && $this->_heartbeatSender)
{
$this->_heartbeatSender->unregister();
}
return $this;
}

public function getDisconnectCount()
{
return $this->_disconnectCount;
}

public function disconnect($connectionMode = null)
{
parent::disconnect($connectionMode);
$this->_disconnectCount++;
}
}

0 comments on commit d533348

Please sign in to comment.