From eeb6967b5e906de50b9d3b37720388f6ce8524c1 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 12 Jan 2018 16:43:59 +0200 Subject: [PATCH] [amqp] fix signal handler if consume called from consume --- pkg/amqp-bunny/AmqpContext.php | 13 ++++--------- pkg/amqp-lib/AmqpContext.php | 13 ++++--------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index 911eff8c6..5ed3d7973 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -55,11 +55,6 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware */ private $subscribers; - /** - * @var SignalSocketHelper - */ - private $signalSocketHandler; - /** * Callable must return instance of \Bunny\Channel once called. * @@ -85,7 +80,6 @@ public function __construct($bunnyChannel, $config = []) $this->buffer = new Buffer(); $this->subscribers = []; - $this->signalSocketHandler = new SignalSocketHelper(); } /** @@ -396,18 +390,19 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } - $this->signalSocketHandler->beforeSocket(); + $signalHandler = new SignalSocketHelper(); + $signalHandler->beforeSocket(); try { $this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); } catch (ClientException $e) { - if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) { + if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) { return; } throw $e; } finally { - $this->signalSocketHandler->afterSocket(); + $signalHandler->afterSocket(); } } diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 3378ebc53..f0b8185a0 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -57,11 +57,6 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware */ private $subscribers; - /** - * @var SignalSocketHelper - */ - private $signalSocketHandler; - /** * @param AbstractConnection $connection * @param array $config @@ -78,7 +73,6 @@ public function __construct(AbstractConnection $connection, $config = []) $this->connection = $connection; $this->buffer = new Buffer(); $this->subscribers = []; - $this->signalSocketHandler = new SignalSocketHelper(); } /** @@ -390,7 +384,8 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } - $this->signalSocketHandler->beforeSocket(); + $signalHandler = new SignalSocketHelper(); + $signalHandler->beforeSocket(); try { while (true) { @@ -413,13 +408,13 @@ public function consume($timeout = 0) } catch (AMQPTimeoutException $e) { } catch (StopBasicConsumptionException $e) { } catch (AMQPIOWaitException $e) { - if ($this->signalSocketHandler->wasThereSignal()) { + if ($signalHandler->wasThereSignal()) { return; } throw $e; } finally { - $this->signalSocketHandler->afterSocket(); + $signalHandler->afterSocket(); } }