diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c9b9b8..3117962 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,15 @@ ## [Unreleased] +### Added +- Added retry exchange for rewind message in queue with delay. +- Added config parameters `idle_timeout` and `wait_timeout`. +- Added publishers: `FifoPublisher`, `DelayPublisher`, `FifoPublisher`, `DeduplicatePublisher`, `DeduplicateDelayPublisher`. + +### Changed +- Optimized receiving a batch of messages in `ConsumerRunCommand`. +- Extended supported queue types by `Delay`, `Deduplicate`. + +### Fixed +- Fix rewind and release partial messages by delivery tag. Changed `ReleasePartialException`, `RewindDelayPartialException`, `RewindPartialException`. ## [0.1.1] - 2021-01-14 ### Changed diff --git a/Client/RabbitMqClient.php b/Client/RabbitMqClient.php index 7d71c70..89a69c7 100644 --- a/Client/RabbitMqClient.php +++ b/Client/RabbitMqClient.php @@ -4,6 +4,9 @@ namespace Wakeapp\Bundle\RabbitQueueBundle\Client; +use PhpAmqpLib\Wire\AMQPTable; +use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum; +use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum; use ErrorException; use Exception; use InvalidArgumentException; @@ -37,9 +40,9 @@ public function isConsuming(): bool * @throws AMQPTimeoutException * @throws ErrorException */ - public function wait() + public function wait(int $timeout = 0) { - return $this->channel->wait(); + return $this->channel->wait(null, false, $timeout); } /** @@ -77,15 +80,26 @@ public function countNotTakenMessages(string $queueName) /** * @param string $queueName * @param AMQPMessage[] $messageList + * @param int $delay */ public function rewindList( string $queueName, - array $messageList + array $messageList, + int $delay = 0 ): void { $this->ackList($messageList); foreach ($messageList as $message) { - $this->channel->batch_basic_publish($message, '', $queueName); + $headers = $message->has('application_headers') ? $message->get('application_headers') : new AMQPTable(); + + $retryCount = $headers->getNativeData()[QueueHeaderOptionEnum::X_RETRY] ?? 0; + + $headers->set(QueueHeaderOptionEnum::X_DELAY, $delay * 1000); + $headers->set(QueueHeaderOptionEnum::X_RETRY, ++$retryCount); + + $message->set('application_headers', $headers); + + $this->channel->batch_basic_publish($message, ExchangeEnum::RETRY_EXCHANGE, $queueName); } $this->channel->publish_batch(); @@ -108,7 +122,6 @@ public function ackList(array $messageList): void /** * @param AMQPMessage[] $messageList - * @param bool $multiple * @param bool $requeue */ public function nackList(array $messageList, bool $requeue = true): void @@ -147,4 +160,23 @@ public function __destruct() $this->channel->close(); $this->connection->close(); } + + /** + * @param AMQPMessage[] $messageList + * @param string|null $exchangeName + * @param string|null $queueName + */ + public function publishBatch(array $messageList, string $exchangeName = null, string $queueName = null): void + { + foreach ($messageList as $message) { + $this->channel->batch_basic_publish($message, $exchangeName, $queueName); + } + + $this->channel->publish_batch(); + } + + public function publish(AMQPMessage $message, string $exchangeName = null, string $queueName = null): void + { + $this->channel->basic_publish($message, $exchangeName, $queueName); + } } diff --git a/Command/ConsumerRunCommand.php b/Command/ConsumerRunCommand.php index 383b6dc..9249ff0 100644 --- a/Command/ConsumerRunCommand.php +++ b/Command/ConsumerRunCommand.php @@ -4,14 +4,9 @@ namespace Wakeapp\Bundle\RabbitQueueBundle\Command; -use Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient; -use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerNotFoundException; -use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerSilentException; -use Wakeapp\Bundle\RabbitQueueBundle\Exception\ReleasePartialException; -use Wakeapp\Bundle\RabbitQueueBundle\Exception\RewindPartialException; -use Wakeapp\Bundle\RabbitQueueBundle\Registry\ConsumerRegistry; use Exception; use JsonException; +use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -20,16 +15,29 @@ use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface; +use Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient; +use Wakeapp\Bundle\RabbitQueueBundle\Consumer\ConsumerInterface; +use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerNotFoundException; +use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerSilentException; +use Wakeapp\Bundle\RabbitQueueBundle\Exception\ReleasePartialException; +use Wakeapp\Bundle\RabbitQueueBundle\Exception\RewindDelayPartialException; +use Wakeapp\Bundle\RabbitQueueBundle\Exception\RewindPartialException; +use Wakeapp\Bundle\RabbitQueueBundle\Registry\ConsumerRegistry; +use Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry; -use const JSON_THROW_ON_ERROR; - +use function array_diff_key; +use function array_flip; +use function array_intersect_key; use function array_map; use function count; use function explode; use function in_array; use function ini_get; -use function pcntl_signal; use function json_encode; +use function pcntl_signal; + +use const JSON_THROW_ON_ERROR; class ConsumerRunCommand extends Command { @@ -37,15 +45,21 @@ class ConsumerRunCommand extends Command private ConsumerRegistry $consumerRegistry; private RabbitMqClient $client; + private DefinitionRegistry $definitionRegistry; + private ParameterBagInterface $parameterBag; private ?LoggerInterface $logger; public function dependencyInjection( ConsumerRegistry $consumerRegistry, RabbitMqClient $client, + DefinitionRegistry $definitionRegistry, + ParameterBagInterface $parameterBag, ?LoggerInterface $logger = null ): void { $this->consumerRegistry = $consumerRegistry; $this->client = $client; + $this->definitionRegistry = $definitionRegistry; + $this->parameterBag = $parameterBag; $this->logger = $logger ?? new NullLogger(); } @@ -106,62 +120,111 @@ protected function execute(InputInterface $input, OutputInterface $output): int $messageList = []; $this->client->qos($batchSize); - $this->client->consume($queueName, $name, function (AMQPMessage $message) use ($consumer, &$messageList) { - $queueName = $consumer->getBindQueueName(); - $batchSize = $consumer->getBatchSize(); - $notTakenMessagesCount = $this->client->countNotTakenMessages($queueName); - - $messageList[] = $message; + $this->client->consume($queueName, $name, function (AMQPMessage $message) use (&$messageList) { + $messageList[$message->getDeliveryTag()] = $message; + }); - if ($notTakenMessagesCount > 0 && count($messageList) < $batchSize) { - return; + while ($this->client->isConsuming()) { + if (count($messageList) === $batchSize) { + $this->batchConsume($consumer, $messageList); } + $timeout = empty($messageList) ? $this->getIdleTimeout() : $this->getWaitTimeout(); + try { - $consumer->process($messageList); + $this->client->wait($timeout); + } catch (AMQPTimeoutException $e) { + if (!empty($messageList)) { + $this->batchConsume($consumer, $messageList); + } + } + } + + return self::SUCCESS; + } - $this->client->ackList($messageList); + protected function batchConsume(ConsumerInterface $consumer, array &$messageList): void + { + try { + $consumer->process($messageList); + $this->client->ackList($messageList); - $consumer->incrementProcessedTasksCounter(); + $consumer->incrementProcessedTasksCounter(); - $maxProcessedTasksCount = $consumer->getMaxProcessedTasksCount(); + $maxProcessedTasksCount = $consumer->getMaxProcessedTasksCount(); - if ($maxProcessedTasksCount > 0 && $maxProcessedTasksCount <= $consumer->getProcessedTasksCounter()) { - $consumer->stopPropagation(); - } - } catch (RewindPartialException $exception) { - $rewindMessageList = $exception->getRewindMessageList(); + if ($maxProcessedTasksCount > 0 && $maxProcessedTasksCount <= $consumer->getProcessedTasksCounter()) { + $consumer->stopPropagation(); + } + } catch (RewindPartialException $exception) { + $rewindDeliveryTagList = $exception->getRewindDeliveryTagList(); - $this->client->rewindList($queueName, $rewindMessageList); - } catch (ConsumerSilentException $exception) { - $this->client->nackList($messageList); - } catch (ReleasePartialException $exception) { - $releaseMessageList = $exception->getReleaseMessageList(); + [$rewindMessageList, $ackMessageList] = $this->getPartialMessageList($messageList, $rewindDeliveryTagList); - $this->client->nackList($releaseMessageList); - } catch (Exception $exception) { - $this->client->nackList($messageList); + $this->client->rewindList($consumer->getBindQueueName(), $rewindMessageList); + $this->client->ackList($ackMessageList); + } catch (RewindDelayPartialException $exception) { + $definition = $this->definitionRegistry->getDefinition($consumer->getBindQueueName()); + $rewindDeliveryTagList = $exception->getRewindDeliveryTagList(); - $this->logger->warning('Error process queue: {errorMessage}', [ - 'errorMessage' => $exception->getMessage(), - ]); + [$rewindMessageList, $ackMessageList] = $this->getPartialMessageList($messageList, $rewindDeliveryTagList); - throw $exception; - } finally { - $messageList = []; + $this->client->rewindList($definition::getQueueName(), $rewindMessageList, $exception->getDelay()); + $this->client->ackList($ackMessageList); + } catch (ConsumerSilentException $exception) { + $this->client->nackList($messageList); + } catch (ReleasePartialException $exception) { + $releaseDeliveryTagList = $exception->getReleaseDeliveryTagList(); - if ($consumer->isPropagationStopped()) { - $this->logger->info('Consumer has been propagation stopped forcibly'); + [$releaseMessageList, $ackMessageList] = $this->getPartialMessageList( + $messageList, + $releaseDeliveryTagList + ); - exit(0); - } - } - }); + $this->client->nackList($releaseMessageList); + $this->client->ackList($ackMessageList); + } catch (Exception $exception) { + $this->client->nackList($messageList); - while ($this->client->isConsuming()) { - $this->client->wait(); + $this->logger->warning('Error process queue: {errorMessage}', [ + 'errorMessage' => $exception->getMessage(), + ]); + + throw $exception; + } finally { + $messageList = []; + + if ($consumer->isPropagationStopped()) { + $this->logger->info('Consumer has been propagation stopped forcibly'); + + exit(0); + } } + } - return self::SUCCESS; + /** + * @param AMQPMessage[] $messageList + * @param int[] $deliveryTagList + * + * @return array> + */ + private function getPartialMessageList(array $messageList, array $deliveryTagList): array + { + $deliveryTagList = array_flip($deliveryTagList); + + $intersectMessageList = array_intersect_key($messageList, $deliveryTagList); + $diffMessageList = array_diff_key($messageList, $deliveryTagList); + + return [$intersectMessageList, $diffMessageList]; + } + + private function getIdleTimeout(): int + { + return $this->parameterBag->get('wakeapp_rabbit_queue.consumer.idle_timeout'); + } + + private function getWaitTimeout(): int + { + return $this->parameterBag->get('wakeapp_rabbit_queue.consumer.wait_timeout'); } } diff --git a/Command/UpdateDefinitionCommand.php b/Command/UpdateDefinitionCommand.php index 426a073..2e8afb5 100644 --- a/Command/UpdateDefinitionCommand.php +++ b/Command/UpdateDefinitionCommand.php @@ -4,11 +4,14 @@ namespace Wakeapp\Bundle\RabbitQueueBundle\Command; -use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exchange\AMQPExchangeType; +use PhpAmqpLib\Wire\AMQPTable; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; +use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; +use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum; class UpdateDefinitionCommand extends Command { @@ -56,8 +59,29 @@ protected function execute(InputInterface $input, OutputInterface $output): int { foreach ($this->definitionList as $definition) { $definition->init($this->connection); + + $this->bindRetryExchange($definition); } return self::SUCCESS; } + + private function bindRetryExchange(DefinitionInterface $definition): void + { + $queueName = $definition::getQueueName(); + $channel = $this->connection->channel(); + + $channel->exchange_declare( + ExchangeEnum::RETRY_EXCHANGE, + 'x-delayed-message', + false, + true, + false, + false, + false, + new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT]) + ); + + $channel->queue_bind($queueName, ExchangeEnum::RETRY_EXCHANGE, $queueName); + } } diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 62bd2e9..5e3bbee 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -34,6 +34,17 @@ public function getConfigTreeBuilder(): TreeBuilder ->end() ->end() ->end() + ->arrayNode('consumer') + ->addDefaultsIfNotSet() + ->children() + ->integerNode('idle_timeout') + ->defaultValue(0) + ->end() + ->integerNode('wait_timeout') + ->defaultValue(3) + ->end() + ->end() + ->end() ->end() ; diff --git a/DependencyInjection/WakeappRabbitQueueExtension.php b/DependencyInjection/WakeappRabbitQueueExtension.php index 9c1d258..edddfa2 100644 --- a/DependencyInjection/WakeappRabbitQueueExtension.php +++ b/DependencyInjection/WakeappRabbitQueueExtension.php @@ -7,6 +7,7 @@ use Wakeapp\Bundle\RabbitQueueBundle\Consumer\ConsumerInterface; use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; use Wakeapp\Bundle\RabbitQueueBundle\Hydrator\HydratorInterface; +use Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface; use Exception; use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; use Symfony\Component\Config\FileLocator; @@ -31,6 +32,7 @@ public function load(array $configs, ContainerBuilder $container): void $container->setParameter('wakeapp_rabbit_queue.hydrator_name', $config['hydrator_name']); $this->setConnectionParams($container, $config['connections']); + $this->setConsumerParams($container, $config['consumer']); $loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config')); $loader->load('services.yaml'); @@ -49,6 +51,11 @@ public function load(array $configs, ContainerBuilder $container): void ->registerForAutoconfiguration(HydratorInterface::class) ->addTag(HydratorInterface::TAG) ; + + $container + ->registerForAutoconfiguration(PublisherInterface::class) + ->addTag(PublisherInterface::TAG) + ; } private function setConnectionParams(ContainerBuilder $container, array $connections): void @@ -70,4 +77,10 @@ private function setConnectionParams(ContainerBuilder $container, array $connect $container->setParameter('wakeapp_rabbit_queue.connection.password', $connection['password']); $container->setParameter('wakeapp_rabbit_queue.connection.vhost', $connection['vhost']); } + + private function setConsumerParams(ContainerBuilder $container, array $consumerConfig): void + { + $container->setParameter('wakeapp_rabbit_queue.consumer.idle_timeout', $consumerConfig['idle_timeout']); + $container->setParameter('wakeapp_rabbit_queue.consumer.wait_timeout', $consumerConfig['wait_timeout']); + } } diff --git a/Enum/ExchangeEnum.php b/Enum/ExchangeEnum.php new file mode 100644 index 0000000..da49542 --- /dev/null +++ b/Enum/ExchangeEnum.php @@ -0,0 +1,10 @@ +releaseMessageList = $releaseMessageList; + $this->releaseDeliveryTagList = $releaseDeliveryTagList; - parent::__construct(sprintf('Consumer "%s" release partial messageList', $name)); + parent::__construct('Consumer release partial message list'); } /** - * @return AMQPMessage[] + * @return int[] */ - public function getReleaseMessageList(): array + public function getReleaseDeliveryTagList(): array { - return $this->releaseMessageList; + return $this->releaseDeliveryTagList; } } diff --git a/Exception/RewindDelayPartialException.php b/Exception/RewindDelayPartialException.php new file mode 100644 index 0000000..b981ce4 --- /dev/null +++ b/Exception/RewindDelayPartialException.php @@ -0,0 +1,51 @@ +rewindDeliveryTagList = $rewindDeliveryTagList; + $this->delay = $delay; + + parent::__construct('Consumer rewind delay partial messageList'); + } + + /** + * @return int[] + */ + public function getRewindDeliveryTagList(): array + { + return $this->rewindDeliveryTagList; + } + + public function getDelay(): int + { + return $this->delay; + } +} diff --git a/Exception/RewindPartialException.php b/Exception/RewindPartialException.php index e9764ae..48caa3d 100644 --- a/Exception/RewindPartialException.php +++ b/Exception/RewindPartialException.php @@ -4,42 +4,40 @@ namespace Wakeapp\Bundle\RabbitQueueBundle\Exception; -use PhpAmqpLib\Message\AMQPMessage; use RuntimeException; -use function sprintf; +use function is_int; class RewindPartialException extends RuntimeException { /** - * @var AMQPMessage[] + * @var int[] */ - private array $rewindMessageList; + private array $rewindDeliveryTagList; /** - * @param string $name - * @param AMQPMessage[] $rewindMessageList + * @param int[] $rewindDeliveryTagList * * @throws RabbitQueueException */ - public function __construct(string $name, array $rewindMessageList) + public function __construct(array $rewindDeliveryTagList) { - foreach ($rewindMessageList as $rewindMessage) { - if (!$rewindMessage instanceof AMQPMessage) { - throw new RabbitQueueException(sprintf('Rewind message must be instance of %s', AMQPMessage::class)); + foreach ($rewindDeliveryTagList as $deliveryTag) { + if (!is_int($deliveryTag)) { + throw new RabbitQueueException('Delivery tag must be integer'); } } - $this->rewindMessageList = $rewindMessageList; + $this->rewindDeliveryTagList = $rewindDeliveryTagList; - parent::__construct(sprintf('Consumer "%s" rewind partial messageList', $name)); + parent::__construct('Consumer rewind partial message list'); } /** - * @return AMQPMessage[] + * @return int[] */ - public function getRewindMessageList(): array + public function getRewindDeliveryTagList(): array { - return $this->rewindMessageList; + return $this->rewindDeliveryTagList; } } diff --git a/Producer/RabbitMqProducer.php b/Producer/RabbitMqProducer.php index 3729cba..579b7ac 100644 --- a/Producer/RabbitMqProducer.php +++ b/Producer/RabbitMqProducer.php @@ -4,39 +4,30 @@ namespace Wakeapp\Bundle\RabbitQueueBundle\Producer; -use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; -use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum; -use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; use Wakeapp\Bundle\RabbitQueueBundle\Exception\DefinitionNotFoundException; use Wakeapp\Bundle\RabbitQueueBundle\Exception\HydratorNotFoundException; use Wakeapp\Bundle\RabbitQueueBundle\Exception\RabbitQueueException; use Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry; use Wakeapp\Bundle\RabbitQueueBundle\Registry\HydratorRegistry; -use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Message\AMQPMessage; -use PhpAmqpLib\Wire\AMQPTable; - -use function is_int; -use function is_string; -use function sprintf; +use Wakeapp\Bundle\RabbitQueueBundle\Registry\PublisherRegistry; class RabbitMqProducer implements RabbitMqProducerInterface { - private AMQPStreamConnection $connection; private DefinitionRegistry $definitionRegistry; private HydratorRegistry $hydratorRegistry; + private PublisherRegistry $publisherRegistry; private string $hydratorName; public function __construct( - AMQPStreamConnection $connection, DefinitionRegistry $definitionRegistry, HydratorRegistry $hydratorRegistry, + PublisherRegistry $publisherRegistry, string $hydratorName ) { - $this->connection = $connection; $this->definitionRegistry = $definitionRegistry; $this->hydratorRegistry = $hydratorRegistry; $this->hydratorName = $hydratorName; + $this->publisherRegistry = $publisherRegistry; } /** @@ -51,84 +42,8 @@ public function put(string $queueName, $data, array $options = []): void $definition = $this->definitionRegistry->getDefinition($queueName); $queueType = $definition->getQueueType(); - $isDeduplicateType = QueueTypeEnum::DEDUPLICATE === (QueueTypeEnum::DEDUPLICATE & $queueType); - $isDelayType = QueueTypeEnum::DELAY === (QueueTypeEnum::DELAY & $queueType); - $isFifoType = QueueTypeEnum::FIFO === (QueueTypeEnum::FIFO & $queueType); - - if ($isFifoType && $isDeduplicateType && $isDelayType) { - $this->putToDeduplicateDelay($definition, $dataString, $options); - - return; - } - - if ($isFifoType) { - $this->putToFifo($definition, $dataString); - - return; - } - - throw new RabbitQueueException('Not support queue type.'); - } - - /** - * @throws HydratorNotFoundException - */ - protected function putToFifo(DefinitionInterface $definition, string $dataString): void - { - $queueName = $definition->getEntryPointName(); - - $msg = new AMQPMessage($dataString, [ - 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, - 'content_type' => $this->hydratorRegistry->getHydrator($this->hydratorName)::getKey(), - ]); - - $channel = $this->connection->channel(); - - $channel->basic_publish($msg, '', $queueName); - } - - /** - * @throws RabbitQueueException - * @throws HydratorNotFoundException - */ - protected function putToDeduplicateDelay(DefinitionInterface $definition, string $dataString, array $options): void - { - if (empty($options[QueueOptionEnum::KEY]) || !is_string($options[QueueOptionEnum::KEY])) { - $message = sprintf( - 'Element for queue "%s" must have option "%s" with type string. See %s', - $definition::getQueueName(), - QueueOptionEnum::KEY, - QueueOptionEnum::class - ); - - throw new RabbitQueueException($message); - } - - if (empty($options[QueueOptionEnum::DELAY]) || !is_int($options[QueueOptionEnum::DELAY])) { - $message = sprintf( - 'Element for queue "%s" must have option "%s" with type int. See %s', - $definition::getQueueName(), - QueueOptionEnum::DELAY, - QueueOptionEnum::class - ); - - throw new RabbitQueueException($message); - } - - $exchangeName = $definition->getEntryPointName(); - - $amqpTableOption['x-deduplication-header'] = $options[QueueOptionEnum::KEY]; - $amqpTableOption['x-delay'] = $options[QueueOptionEnum::DELAY] * 1000; - $amqpTableOption['x-cache-ttl'] = $options[QueueOptionEnum::DELAY] * 1000; - - $message = new AMQPMessage($dataString, [ - 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, - 'content_type' => $this->hydratorRegistry->getHydrator($this->hydratorName)::getKey(), - ]); - $message->set('application_headers', new AMQPTable($amqpTableOption)); - - $channel = $this->connection->channel(); + $publisher = $this->publisherRegistry->getPublisher($queueType); - $channel->basic_publish($message, $exchangeName); + $publisher->publish($definition, $dataString, $options); } } diff --git a/Publisher/AbstractPublisher.php b/Publisher/AbstractPublisher.php new file mode 100644 index 0000000..3d2da4f --- /dev/null +++ b/Publisher/AbstractPublisher.php @@ -0,0 +1,76 @@ +hydratorRegistry = $hydratorRegistry; + $this->hydratorName = $hydratorName; + $this->client = $client; + } + + abstract protected function prepareOptions(DefinitionInterface $definition, array $options): array; + + public function publish(DefinitionInterface $definition, string $dataString, array $options = []): void + { + $exchangeName = $this->getDefinitionExchangeName($definition); + $queueName = $this->getDefinitionQueueName($definition); + + $message = new AMQPMessage($dataString, [ + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + 'content_type' => $this->hydratorRegistry->getHydrator($this->hydratorName)::getKey(), + ]); + + $amqpTableOptions = $this->prepareOptions($definition, $options); + + if (!empty($amqpTableOptions)) { + $message->set('application_headers', new AMQPTable($amqpTableOptions)); + } + + $this->client->publish($message, $exchangeName, $queueName); + } + + abstract public static function getQueueType(): string; + + protected function getDefinitionExchangeName(DefinitionInterface $definition): string + { + if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) { + return self::DEFAULT_NAME; + } + + return $definition->getQueueType() === QueueTypeEnum::FIFO + ? self::DEFAULT_NAME + : $definition->getEntryPointName() + ; + } + + protected function getDefinitionQueueName(DefinitionInterface $definition): string + { + if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) { + return $definition::getQueueName(); + } + + return $definition->getQueueType() === QueueTypeEnum::FIFO + ? $definition::getQueueName() + : self::DEFAULT_NAME + ; + } +} diff --git a/Publisher/DeduplicateDelayPublisher.php b/Publisher/DeduplicateDelayPublisher.php new file mode 100644 index 0000000..578b2c7 --- /dev/null +++ b/Publisher/DeduplicateDelayPublisher.php @@ -0,0 +1,48 @@ + 'unique_key', 'delay' => 1000]; # Опции, в завис $producer->put('queue_name', $data, $options); ``` +### Publisher +Публикация сообщений в очередь происходит с помощью специальных классов паблишеров. +`Producer` определяет какой паблишер использовать для публикации по типу очереди, с которым связан паблишер. + +Соответственно на каждый новый тип очереди требуется свой класс `Publisher` с кастомной логикой обработки/валидации и публикации сообщений в канал. + +Бандл поддерживает следующие типы очередей: + - FIFO + - Delay + - Deduplicate + - Deduplicate + Delay + +При желании добавить собственный тип очереди, необходимо создать класс `Publisher` наследующий [AbstractPublisher](Publisher/AbstractPublisher.php) или реализующий [PublisherInterface](Publisher/PublisherInterface.php). + +Пример DelayPublisher: +```php +publisherRegistry = $publisherRegistry; + } + + /** + * @throws PublisherNotFoundException + */ + public function getPublisher(int $queueType): AbstractPublisher + { + if ($this->publisherRegistry->has($queueType)) { + return $this->publisherRegistry->get($queueType); + } + + throw new PublisherNotFoundException(sprintf('Publisher for queue type "%s" not found', $queueType)); + } +} diff --git a/Resources/config/services.yaml b/Resources/config/services.yaml index f960d94..725549e 100644 --- a/Resources/config/services.yaml +++ b/Resources/config/services.yaml @@ -11,6 +11,10 @@ services: arguments: - !tagged_locator { tag: !php/const Wakeapp\Bundle\RabbitQueueBundle\Consumer\ConsumerInterface::TAG, default_index_method: 'getName' } + Wakeapp\Bundle\RabbitQueueBundle\Registry\PublisherRegistry: + arguments: + - !tagged_locator { tag: !php/const Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface::TAG, default_index_method: 'getQueueType' } + Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry: arguments: - !tagged_locator { tag: !php/const Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface::TAG, default_index_method: 'getQueueName' } @@ -27,11 +31,37 @@ services: tags: - { name: !php/const Wakeapp\Bundle\RabbitQueueBundle\Hydrator\HydratorInterface::TAG, default_index_method: 'getKey' } + Wakeapp\Bundle\RabbitQueueBundle\Publisher\AbstractPublisher: + arguments: + - '@Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient' + - '@Wakeapp\Bundle\RabbitQueueBundle\Registry\HydratorRegistry' + - '%wakeapp_rabbit_queue.hydrator_name%' + + Wakeapp\Bundle\RabbitQueueBundle\Publisher\FifoPublisher: + parent: 'Wakeapp\Bundle\RabbitQueueBundle\Publisher\AbstractPublisher' + tags: + - { name: !php/const Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface::TAG, default_index_method: 'getQueueType' } + + Wakeapp\Bundle\RabbitQueueBundle\Publisher\DelayPublisher: + parent: 'Wakeapp\Bundle\RabbitQueueBundle\Publisher\AbstractPublisher' + tags: + - { name: !php/const Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface::TAG, default_index_method: 'getQueueType' } + + Wakeapp\Bundle\RabbitQueueBundle\Publisher\DeduplicateDelayPublisher: + parent: 'Wakeapp\Bundle\RabbitQueueBundle\Publisher\AbstractPublisher' + tags: + - { name: !php/const Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface::TAG, default_index_method: 'getQueueType' } + + Wakeapp\Bundle\RabbitQueueBundle\Publisher\DeduplicatePublisher: + parent: 'Wakeapp\Bundle\RabbitQueueBundle\Publisher\AbstractPublisher' + tags: + - { name: !php/const Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface::TAG, default_index_method: 'getQueueType' } + Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer: arguments: - - '@PhpAmqpLib\Connection\AMQPStreamConnection' - '@Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry' - '@Wakeapp\Bundle\RabbitQueueBundle\Registry\HydratorRegistry' + - '@Wakeapp\Bundle\RabbitQueueBundle\Registry\PublisherRegistry' - '%wakeapp_rabbit_queue.hydrator_name%' Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface: '@Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer' @@ -43,7 +73,7 @@ services: Wakeapp\Bundle\RabbitQueueBundle\Command\ConsumerRunCommand: calls: - - ['dependencyInjection', [ '@Wakeapp\Bundle\RabbitQueueBundle\Registry\ConsumerRegistry', '@Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient', '@?logger' ]] + - ['dependencyInjection', [ '@Wakeapp\Bundle\RabbitQueueBundle\Registry\ConsumerRegistry', '@Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient', '@Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry', '@Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface','@?logger' ]] tags: [ 'console.command' ] Wakeapp\Bundle\RabbitQueueBundle\Command\UpdateDefinitionCommand: diff --git a/Tests/Command/ConsumerRunCommandTest.php b/Tests/Command/ConsumerRunCommandTest.php index bcd4c22..0756374 100644 --- a/Tests/Command/ConsumerRunCommandTest.php +++ b/Tests/Command/ConsumerRunCommandTest.php @@ -8,9 +8,11 @@ use Symfony\Component\Console\Application; use Symfony\Component\Console\Exception\RuntimeException; use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface; use Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient; use Wakeapp\Bundle\RabbitQueueBundle\Command\ConsumerRunCommand; use Wakeapp\Bundle\RabbitQueueBundle\Registry\ConsumerRegistry; +use Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry; class ConsumerRunCommandTest extends TestCase { @@ -26,7 +28,9 @@ protected function setUp(): void $command = new ConsumerRunCommand(); $command->dependencyInjection( $this->createMock(ConsumerRegistry::class), - $this->createMock(RabbitMqClient::class) + $this->createMock(RabbitMqClient::class), + $this->createMock(DefinitionRegistry::class), + $this->createMock(ParameterBagInterface::class) ); $this->application->add($command); @@ -34,7 +38,7 @@ protected function setUp(): void public function testExecute(): void { - $command = $this->application->find('rabbit:consume:run'); + $command = $this->application->find('rabbit:consumer:run'); $commandTester = new CommandTester($command); $commandTester->execute(['name' => 'example']); @@ -48,7 +52,7 @@ public function testExecuteFailWithoutNameParameter(): void { $this->expectException(RuntimeException::class); - $command = $this->application->find('rabbit:consume:run'); + $command = $this->application->find('rabbit:consumer:run'); $commandTester = new CommandTester($command); $commandTester->execute([]); diff --git a/Tests/Producer/RabbitMqProducerTest.php b/Tests/Producer/RabbitMqProducerTest.php index c3d3840..b66b52a 100644 --- a/Tests/Producer/RabbitMqProducerTest.php +++ b/Tests/Producer/RabbitMqProducerTest.php @@ -4,17 +4,15 @@ namespace Wakeapp\Bundle\RabbitQueueBundle\Tests\Producer; -use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Connection\AMQPStreamConnection; use PHPUnit\Framework\TestCase; use Wakeapp\Bundle\RabbitQueueBundle\Definition\ExampleDefinition; use Wakeapp\Bundle\RabbitQueueBundle\Definition\ExampleFifoDefinition; use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum; -use Wakeapp\Bundle\RabbitQueueBundle\Exception\RabbitQueueException; use Wakeapp\Bundle\RabbitQueueBundle\Hydrator\JsonHydrator; use Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer; use Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry; use Wakeapp\Bundle\RabbitQueueBundle\Registry\HydratorRegistry; +use Wakeapp\Bundle\RabbitQueueBundle\Registry\PublisherRegistry; class RabbitMqProducerTest extends TestCase { @@ -24,11 +22,7 @@ class RabbitMqProducerTest extends TestCase protected function setUp(): void { - $connection = $this->createMock(AMQPStreamConnection::class); - $connection - ->method('channel') - ->willReturn($this->createMock(AMQPChannel::class)) - ; + $publisherRegistry = $this->createMock(PublisherRegistry::class); $definitionRegistry = $this->createMock(DefinitionRegistry::class); $definitionRegistry @@ -46,45 +40,13 @@ protected function setUp(): void ->willReturn(new JsonHydrator()) ; - $this->producer = new RabbitMqProducer($connection, $definitionRegistry, $hydratorRegistry, JsonHydrator::KEY); - } - - public function testPutToDeduplicateDelayQueue(): void - { - $options = [ - 'key' => 'unique_key', - 'delay' => 10000, - ]; - - $this->producer->put(QueueEnum::EXAMPLE_DEDUPLICATE_DELAY, self::TEST_MESSAGE, $options); - - self::assertTrue(true); - } - - /** - * @dataProvider invalidOptionsProvider - */ - public function testPutToDeduplicateDelayQueueWithoutOptions(array $options): void - { - $this->expectException(RabbitQueueException::class); - - $this->producer->put(QueueEnum::EXAMPLE_DEDUPLICATE_DELAY, self::TEST_MESSAGE, $options); + $this->producer = new RabbitMqProducer($definitionRegistry, $hydratorRegistry, $publisherRegistry, JsonHydrator::KEY); } - public function testPutToFifoTest(): void + public function testPut(): void { $this->producer->put(QueueEnum::EXAMPLE_FIFO, self::TEST_MESSAGE); self::assertTrue(true); } - - public function invalidOptionsProvider(): array - { - return [ - 'empty options' => [[]], - 'only key option' => [['key' => 'unique_key']], - 'only delay option' => [['delay' => 1]], - 'invalid string delay option' => [['delay' => 'test', 'key' => 'unique_key']], - ]; - } } diff --git a/Tests/Publisher/DeduplicateDelayPublisherTest.php b/Tests/Publisher/DeduplicateDelayPublisherTest.php new file mode 100644 index 0000000..8c6acb3 --- /dev/null +++ b/Tests/Publisher/DeduplicateDelayPublisherTest.php @@ -0,0 +1,63 @@ + 10, 'key' => 'unique_key']; + private const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY | QueueTypeEnum::DEDUPLICATE; + + public function testPublish(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, '') + ; + + $publisher = new DeduplicateDelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS); + + self::assertTrue(true); + } + + /** + * @dataProvider invalidOptionsProvider + */ + public function testPublishInvalidOptions(array $options): void + { + $this->expectException(RabbitQueueException::class); + + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $client = $this->createMock(RabbitMqClient::class); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $publisher = new DeduplicateDelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, $options); + } + + public function invalidOptionsProvider(): array + { + return [ + 'empty options' => [[]], + 'only key option' => [['key' => 'unique_key']], + 'only delay option' => [['delay' => 1]], + 'invalid string delay option' => [['delay' => 'test', 'key' => 'unique_key']], + ]; + } +} diff --git a/Tests/Publisher/DeduplicatePublisherTest.php b/Tests/Publisher/DeduplicatePublisherTest.php new file mode 100644 index 0000000..301571b --- /dev/null +++ b/Tests/Publisher/DeduplicatePublisherTest.php @@ -0,0 +1,61 @@ + 'test']; + public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE; + + public function testPublish(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), '', self::TEST_QUEUE_NAME) + ; + + $publisher = new DeduplicatePublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS); + + self::assertTrue(true); + } + + /** + * @dataProvider invalidOptionsProvider + */ + public function testPublishInvalidOptions(array $options): void + { + $this->expectException(RabbitQueueException::class); + + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_QUEUE_NAME, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + $client = $this->createMock(RabbitMqClient::class); + + $publisher = new DeduplicatePublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, $options); + } + + public function invalidOptionsProvider(): array + { + return [ + 'empty options' => [[]], + 'invalid key option' => [['key' => 1]], + ]; + } +} diff --git a/Tests/Publisher/DelayPublisherTest.php b/Tests/Publisher/DelayPublisherTest.php new file mode 100644 index 0000000..a3439f0 --- /dev/null +++ b/Tests/Publisher/DelayPublisherTest.php @@ -0,0 +1,61 @@ + 10]; + public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY; + + public function testPublish(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, '') + ; + + $publisher = new DelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS); + + self::assertTrue(true); + } + + /** + * @dataProvider invalidOptionsProvider + */ + public function testPublishInvalidOptions(array $options): void + { + $this->expectException(RabbitQueueException::class); + + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + $client = $this->createMock(RabbitMqClient::class); + + $publisher = new DelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, $options); + } + + public function invalidOptionsProvider(): array + { + return [ + 'empty options' => [[]], + 'invalid delay option' => [['delay' => '1']], + ]; + } +} diff --git a/Tests/Publisher/FifoPublisherTest.php b/Tests/Publisher/FifoPublisherTest.php new file mode 100644 index 0000000..acf0a96 --- /dev/null +++ b/Tests/Publisher/FifoPublisherTest.php @@ -0,0 +1,35 @@ +createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), '', self::TEST_QUEUE_NAME) + ; + + $publisher = new FifoPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE); + + self::assertTrue(true); + } +} diff --git a/Tests/TestCase/AbstractTestCase.php b/Tests/TestCase/AbstractTestCase.php new file mode 100644 index 0000000..1779384 --- /dev/null +++ b/Tests/TestCase/AbstractTestCase.php @@ -0,0 +1,65 @@ +entryPointName = $entryPointName; + $this->queueType = $queueType; + self::$queueName = $queueName; + } + + public function init(AMQPStreamConnection $connection) + { + } + + public function getEntryPointName(): string + { + return $this->entryPointName; + } + + public function getQueueType(): int + { + return $this->queueType; + } + + public static function getQueueName(): string + { + return self::$queueName; + } + }; + } + + protected function createHydratorRegistryMock(): HydratorRegistry + { + $hydratorRegistry = $this->createMock(HydratorRegistry::class); + $hydratorRegistry + ->method('getHydrator') + ->with(JsonHydrator::KEY) + ->willReturn(new JsonHydrator()) + ; + + return $hydratorRegistry; + } +}