Skip to content
Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
66 lines (54 sloc) 1.8 KB
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Receiver;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.2
*/
class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
{
private $decoratedReceiver;
private $maximumNumberOfMessages;
private $logger;
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null)
{
$this->decoratedReceiver = $decoratedReceiver;
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
$this->logger = $logger;
}
public function receive(callable $handler): void
{
$receivedMessages = 0;
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
$handler($envelope);
if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Receiver stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
}
}
});
}
public function stop(): void
{
$this->decoratedReceiver->stop();
}
public function ack(Envelope $envelope): void
{
$this->decoratedReceiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->decoratedReceiver->reject($envelope);
}
}
You can’t perform that action at this time.