Skip to content

Commit

Permalink
Merge pull request #317 from php-enqueue/amqp-signal-socket-issue-fix
Browse files Browse the repository at this point in the history
[amqp] Fix socket and signal issue.
  • Loading branch information
makasim committed Jan 9, 2018
2 parents 8df8b16 + ff5498e commit 72b51c4
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 6 deletions.
22 changes: 21 additions & 1 deletion pkg/amqp-bunny/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

use Bunny\Channel;
use Bunny\Client;
use Bunny\Exception\ClientException;
use Bunny\Message;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand Down Expand Up @@ -53,6 +55,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
*/
private $subscribers;

/**
* @var SignalSocketHelper
*/
private $signalSocketHandler;

/**
* Callable must return instance of \Bunny\Channel once called.
*
Expand All @@ -78,6 +85,7 @@ public function __construct($bunnyChannel, $config = [])

$this->buffer = new Buffer();
$this->subscribers = [];
$this->signalSocketHandler = new SignalSocketHelper();
}

/**
Expand Down Expand Up @@ -388,7 +396,19 @@ public function consume($timeout = 0)
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
}

$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
$this->signalSocketHandler->beforeSocket();

try {
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
} catch (ClientException $e) {
if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) {
return;
}

throw $e;
} finally {
$this->signalSocketHandler->afterSocket();
}
}

/**
Expand Down
18 changes: 18 additions & 0 deletions pkg/amqp-lib/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -20,6 +21,7 @@
use Interop\Queue\PsrTopic;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPIOWaitException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
Expand Down Expand Up @@ -55,6 +57,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
*/
private $subscribers;

/**
* @var SignalSocketHelper
*/
private $signalSocketHandler;

/**
* @param AbstractConnection $connection
* @param array $config
Expand All @@ -71,6 +78,7 @@ public function __construct(AbstractConnection $connection, $config = [])
$this->connection = $connection;
$this->buffer = new Buffer();
$this->subscribers = [];
$this->signalSocketHandler = new SignalSocketHelper();
}

/**
Expand Down Expand Up @@ -382,6 +390,8 @@ public function consume($timeout = 0)
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
}

$this->signalSocketHandler->beforeSocket();

try {
while (true) {
$start = microtime(true);
Expand All @@ -402,6 +412,14 @@ public function consume($timeout = 0)
}
} catch (AMQPTimeoutException $e) {
} catch (StopBasicConsumptionException $e) {
} catch (AMQPIOWaitException $e) {
if ($this->signalSocketHandler->wasThereSignal()) {
return;
}

throw $e;
} finally {
$this->signalSocketHandler->afterSocket();
}
}

Expand Down
83 changes: 83 additions & 0 deletions pkg/amqp-tools/SignalSocketHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

namespace Enqueue\AmqpTools;

class SignalSocketHelper
{
/**
* @var callable[]
*/
private $handlers;

/**
* @var bool
*/
private $wasThereSignal;

/**
* @var int[]
*/
private $signals = [SIGTERM, SIGQUIT, SIGINT];

public function __construct()
{
$this->handlers = [];
}

public function beforeSocket()
{
// PHP 7.1 and higher
if (false == function_exists('pcntl_signal_get_handler')) {
return;
}

if ($this->handlers) {
throw new \LogicException('The handlers property should be empty but it is not. The afterSocket method might not have been called.');
}
if (null !== $this->wasThereSignal) {
throw new \LogicException('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.');
}

$this->wasThereSignal = false;

foreach ($this->signals as $signal) {
/** @var callable $handler */
$handler = pcntl_signal_get_handler($signal);

pcntl_signal($signal, function ($signal) use ($handler) {
var_dump('fuckk!');
$this->wasThereSignal = true;

$handler && $handler($signal);
});

$handler && $this->handlers[$signal] = $handler;
}
}

public function afterSocket()
{
// PHP 7.1 and higher
if (false == function_exists('pcntl_signal_get_handler')) {
return;
}

$this->wasThereSignal = null;

foreach ($this->signals as $signal) {
$handler = isset($this->handlers[$signal]) ? $this->handlers[$signal] : SIG_DFL;

pcntl_signal($signal, $handler);
}

$this->handlers = [];
}

/**
* @return bool
*/
public function wasThereSignal()
{
return (bool) $this->wasThereSignal;
}
}
124 changes: 124 additions & 0 deletions pkg/amqp-tools/Tests/SignalSocketHelperTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

namespace Enqueue\AmqpTools\Tests;

use Enqueue\AmqpTools\SignalSocketHelper;
use PHPUnit\Framework\TestCase;

class SignalSocketHelperTest extends TestCase
{
/**
* @var SignalSocketHelper
*/
private $signalHelper;

private $backupSigTermHandler;

private $backupSigIntHandler;

public function setUp()
{
parent::setUp();

if (false == function_exists('pcntl_signal_get_handler')) {
$this->markTestSkipped('PHP 7.1 and higher');
}

$this->backupSigTermHandler = pcntl_signal_get_handler(SIGTERM);
$this->backupSigIntHandler = pcntl_signal_get_handler(SIGINT);

pcntl_signal(SIGTERM, SIG_DFL);
pcntl_signal(SIGINT, SIG_DFL);

$this->signalHelper = new SignalSocketHelper();
}

public function tearDown()
{
parent::tearDown();

if ($this->signalHelper) {
$this->signalHelper->afterSocket();
}

if ($this->backupSigTermHandler) {
pcntl_signal(SIGTERM, $this->backupSigTermHandler);
}

if ($this->backupSigIntHandler) {
pcntl_signal(SIGINT, $this->backupSigIntHandler);
}
}

public function testShouldReturnFalseByDefault()
{
$this->assertFalse($this->signalHelper->wasThereSignal());
}

public function testShouldRegisterHandlerOnBeforeSocket()
{
$this->signalHelper->beforeSocket();

$this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper);
$this->assertAttributeSame([], 'handlers', $this->signalHelper);
}

public function testShouldRegisterHandlerOnBeforeSocketAndBackupCurrentOne()
{
$handler = function () {};

pcntl_signal(SIGTERM, $handler);

$this->signalHelper->beforeSocket();

$this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper);

$handlers = $this->readAttribute($this->signalHelper, 'handlers');

$this->assertInternalType('array', $handlers);
$this->assertArrayHasKey(SIGTERM, $handlers);
$this->assertSame($handler, $handlers[SIGTERM]);
}

public function testRestoreDefaultPropertiesOnAfterSocket()
{
$this->signalHelper->beforeSocket();
$this->signalHelper->afterSocket();

$this->assertAttributeSame(null, 'wasThereSignal', $this->signalHelper);
$this->assertAttributeSame([], 'handlers', $this->signalHelper);
}

public function testRestorePreviousHandlerOnAfterSocket()
{
$handler = function () {};

pcntl_signal(SIGTERM, $handler);

$this->signalHelper->beforeSocket();
$this->signalHelper->afterSocket();

$this->assertSame($handler, pcntl_signal_get_handler(SIGTERM));
}

public function testThrowsIfBeforeSocketCalledSecondTime()
{
$this->signalHelper->beforeSocket();

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.');
$this->signalHelper->beforeSocket();
}

public function testShouldReturnTrueOnWasThereSignal()
{
$this->signalHelper->beforeSocket();

posix_kill(getmypid(), SIGINT);
pcntl_signal_dispatch();

$this->assertTrue($this->signalHelper->wasThereSignal());

$this->signalHelper->afterSocket();
}
}
17 changes: 14 additions & 3 deletions pkg/enqueue/Consumption/Extension/SignalExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public function onStart(Context $context)
throw new LogicException('The pcntl extension is required in order to catch signals.');
}

if (function_exists('pcntl_async_signals')) {
pcntl_async_signals(true);
}

pcntl_signal(SIGTERM, [$this, 'handleSignal']);
pcntl_signal(SIGQUIT, [$this, 'handleSignal']);
pcntl_signal(SIGINT, [$this, 'handleSignal']);
Expand All @@ -45,7 +49,7 @@ public function onBeforeReceive(Context $context)
{
$this->logger = $context->getLogger();

pcntl_signal_dispatch();
$this->dispatchSignal();

$this->interruptExecutionIfNeeded($context);
}
Expand All @@ -63,7 +67,7 @@ public function onPreReceived(Context $context)
*/
public function onPostReceived(Context $context)
{
pcntl_signal_dispatch();
$this->dispatchSignal();

$this->interruptExecutionIfNeeded($context);
}
Expand All @@ -73,7 +77,7 @@ public function onPostReceived(Context $context)
*/
public function onIdle(Context $context)
{
pcntl_signal_dispatch();
$this->dispatchSignal();

$this->interruptExecutionIfNeeded($context);
}
Expand Down Expand Up @@ -117,4 +121,11 @@ public function handleSignal($signal)
break;
}
}

private function dispatchSignal()
{
if (false == function_exists('pcntl_async_signals')) {
pcntl_signal_dispatch();
}
}
}
6 changes: 4 additions & 2 deletions pkg/enqueue/Symfony/AmqpTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ public function addConfiguration(ArrayNodeDefinition $builder)
throw new \InvalidArgumentException('There is no amqp driver available. Please consider installing one of the packages: enqueue/amqp-ext, enqueue/amqp-lib, enqueue/amqp-bunny.');
}

if (isset($v['driver']) && false == in_array($v['driver'], $drivers, true)) {
throw new \InvalidArgumentException(sprintf('Unexpected driver given "invalidDriver". Available are "%s"', implode('", "', $drivers)));
if ($v && false == in_array($v, $drivers, true)) {
throw new \InvalidArgumentException(sprintf('Unexpected driver given "%s". Available are "%s"', $v, implode('", "', $drivers)));
}

return $v;
})
->end()
->end()
Expand Down

0 comments on commit 72b51c4

Please sign in to comment.