Skip to content
This repository has been archived by the owner on Jul 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2 from wakeapp/HUB-399
Browse files Browse the repository at this point in the history
HUB-399 optimized batch consuming, added publishers
  • Loading branch information
revilon1991 committed Mar 4, 2021
2 parents 202203b + f0f9a59 commit 91b3741
Show file tree
Hide file tree
Showing 29 changed files with 1,011 additions and 223 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
## [Unreleased]
### Added
- Added retry exchange for rewind message in queue with delay.
- Added config parameters `idle_timeout` and `wait_timeout`.
- Added publishers: `FifoPublisher`, `DelayPublisher`, `FifoPublisher`, `DeduplicatePublisher`, `DeduplicateDelayPublisher`.

### Changed
- Optimized receiving a batch of messages in `ConsumerRunCommand`.
- Extended supported queue types by `Delay`, `Deduplicate`.

### Fixed
- Fix rewind and release partial messages by delivery tag. Changed `ReleasePartialException`, `RewindDelayPartialException`, `RewindPartialException`.

## [0.1.1] - 2021-01-14
### Changed
Expand Down
42 changes: 37 additions & 5 deletions Client/RabbitMqClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Wakeapp\Bundle\RabbitQueueBundle\Client;

use PhpAmqpLib\Wire\AMQPTable;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum;
use ErrorException;
use Exception;
use InvalidArgumentException;
Expand Down Expand Up @@ -37,9 +40,9 @@ public function isConsuming(): bool
* @throws AMQPTimeoutException
* @throws ErrorException
*/
public function wait()
public function wait(int $timeout = 0)
{
return $this->channel->wait();
return $this->channel->wait(null, false, $timeout);
}

/**
Expand Down Expand Up @@ -77,15 +80,26 @@ public function countNotTakenMessages(string $queueName)
/**
* @param string $queueName
* @param AMQPMessage[] $messageList
* @param int $delay
*/
public function rewindList(
string $queueName,
array $messageList
array $messageList,
int $delay = 0
): void {
$this->ackList($messageList);

foreach ($messageList as $message) {
$this->channel->batch_basic_publish($message, '', $queueName);
$headers = $message->has('application_headers') ? $message->get('application_headers') : new AMQPTable();

$retryCount = $headers->getNativeData()[QueueHeaderOptionEnum::X_RETRY] ?? 0;

$headers->set(QueueHeaderOptionEnum::X_DELAY, $delay * 1000);
$headers->set(QueueHeaderOptionEnum::X_RETRY, ++$retryCount);

$message->set('application_headers', $headers);

$this->channel->batch_basic_publish($message, ExchangeEnum::RETRY_EXCHANGE, $queueName);
}

$this->channel->publish_batch();
Expand All @@ -108,7 +122,6 @@ public function ackList(array $messageList): void

/**
* @param AMQPMessage[] $messageList
* @param bool $multiple
* @param bool $requeue
*/
public function nackList(array $messageList, bool $requeue = true): void
Expand Down Expand Up @@ -147,4 +160,23 @@ public function __destruct()
$this->channel->close();
$this->connection->close();
}

/**
* @param AMQPMessage[] $messageList
* @param string|null $exchangeName
* @param string|null $queueName
*/
public function publishBatch(array $messageList, string $exchangeName = null, string $queueName = null): void
{
foreach ($messageList as $message) {
$this->channel->batch_basic_publish($message, $exchangeName, $queueName);
}

$this->channel->publish_batch();
}

public function publish(AMQPMessage $message, string $exchangeName = null, string $queueName = null): void
{
$this->channel->basic_publish($message, $exchangeName, $queueName);
}
}
161 changes: 112 additions & 49 deletions Command/ConsumerRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@

namespace Wakeapp\Bundle\RabbitQueueBundle\Command;

use Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerNotFoundException;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerSilentException;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\ReleasePartialException;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\RewindPartialException;
use Wakeapp\Bundle\RabbitQueueBundle\Registry\ConsumerRegistry;
use Exception;
use JsonException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
Expand All @@ -20,32 +15,51 @@
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Client\RabbitMqClient;
use Wakeapp\Bundle\RabbitQueueBundle\Consumer\ConsumerInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerNotFoundException;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\ConsumerSilentException;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\ReleasePartialException;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\RewindDelayPartialException;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\RewindPartialException;
use Wakeapp\Bundle\RabbitQueueBundle\Registry\ConsumerRegistry;
use Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry;

use const JSON_THROW_ON_ERROR;

use function array_diff_key;
use function array_flip;
use function array_intersect_key;
use function array_map;
use function count;
use function explode;
use function in_array;
use function ini_get;
use function pcntl_signal;
use function json_encode;
use function pcntl_signal;

use const JSON_THROW_ON_ERROR;

class ConsumerRunCommand extends Command
{
protected static $defaultName = 'rabbit:consumer:run';

private ConsumerRegistry $consumerRegistry;
private RabbitMqClient $client;
private DefinitionRegistry $definitionRegistry;
private ParameterBagInterface $parameterBag;
private ?LoggerInterface $logger;

public function dependencyInjection(
ConsumerRegistry $consumerRegistry,
RabbitMqClient $client,
DefinitionRegistry $definitionRegistry,
ParameterBagInterface $parameterBag,
?LoggerInterface $logger = null
): void {
$this->consumerRegistry = $consumerRegistry;
$this->client = $client;
$this->definitionRegistry = $definitionRegistry;
$this->parameterBag = $parameterBag;
$this->logger = $logger ?? new NullLogger();
}

Expand Down Expand Up @@ -106,62 +120,111 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$messageList = [];

$this->client->qos($batchSize);
$this->client->consume($queueName, $name, function (AMQPMessage $message) use ($consumer, &$messageList) {
$queueName = $consumer->getBindQueueName();
$batchSize = $consumer->getBatchSize();
$notTakenMessagesCount = $this->client->countNotTakenMessages($queueName);

$messageList[] = $message;
$this->client->consume($queueName, $name, function (AMQPMessage $message) use (&$messageList) {
$messageList[$message->getDeliveryTag()] = $message;
});

if ($notTakenMessagesCount > 0 && count($messageList) < $batchSize) {
return;
while ($this->client->isConsuming()) {
if (count($messageList) === $batchSize) {
$this->batchConsume($consumer, $messageList);
}

$timeout = empty($messageList) ? $this->getIdleTimeout() : $this->getWaitTimeout();

try {
$consumer->process($messageList);
$this->client->wait($timeout);
} catch (AMQPTimeoutException $e) {
if (!empty($messageList)) {
$this->batchConsume($consumer, $messageList);
}
}
}

return self::SUCCESS;
}

$this->client->ackList($messageList);
protected function batchConsume(ConsumerInterface $consumer, array &$messageList): void
{
try {
$consumer->process($messageList);
$this->client->ackList($messageList);

$consumer->incrementProcessedTasksCounter();
$consumer->incrementProcessedTasksCounter();

$maxProcessedTasksCount = $consumer->getMaxProcessedTasksCount();
$maxProcessedTasksCount = $consumer->getMaxProcessedTasksCount();

if ($maxProcessedTasksCount > 0 && $maxProcessedTasksCount <= $consumer->getProcessedTasksCounter()) {
$consumer->stopPropagation();
}
} catch (RewindPartialException $exception) {
$rewindMessageList = $exception->getRewindMessageList();
if ($maxProcessedTasksCount > 0 && $maxProcessedTasksCount <= $consumer->getProcessedTasksCounter()) {
$consumer->stopPropagation();
}
} catch (RewindPartialException $exception) {
$rewindDeliveryTagList = $exception->getRewindDeliveryTagList();

$this->client->rewindList($queueName, $rewindMessageList);
} catch (ConsumerSilentException $exception) {
$this->client->nackList($messageList);
} catch (ReleasePartialException $exception) {
$releaseMessageList = $exception->getReleaseMessageList();
[$rewindMessageList, $ackMessageList] = $this->getPartialMessageList($messageList, $rewindDeliveryTagList);

$this->client->nackList($releaseMessageList);
} catch (Exception $exception) {
$this->client->nackList($messageList);
$this->client->rewindList($consumer->getBindQueueName(), $rewindMessageList);
$this->client->ackList($ackMessageList);
} catch (RewindDelayPartialException $exception) {
$definition = $this->definitionRegistry->getDefinition($consumer->getBindQueueName());
$rewindDeliveryTagList = $exception->getRewindDeliveryTagList();

$this->logger->warning('Error process queue: {errorMessage}', [
'errorMessage' => $exception->getMessage(),
]);
[$rewindMessageList, $ackMessageList] = $this->getPartialMessageList($messageList, $rewindDeliveryTagList);

throw $exception;
} finally {
$messageList = [];
$this->client->rewindList($definition::getQueueName(), $rewindMessageList, $exception->getDelay());
$this->client->ackList($ackMessageList);
} catch (ConsumerSilentException $exception) {
$this->client->nackList($messageList);
} catch (ReleasePartialException $exception) {
$releaseDeliveryTagList = $exception->getReleaseDeliveryTagList();

if ($consumer->isPropagationStopped()) {
$this->logger->info('Consumer has been propagation stopped forcibly');
[$releaseMessageList, $ackMessageList] = $this->getPartialMessageList(
$messageList,
$releaseDeliveryTagList
);

exit(0);
}
}
});
$this->client->nackList($releaseMessageList);
$this->client->ackList($ackMessageList);
} catch (Exception $exception) {
$this->client->nackList($messageList);

while ($this->client->isConsuming()) {
$this->client->wait();
$this->logger->warning('Error process queue: {errorMessage}', [
'errorMessage' => $exception->getMessage(),
]);

throw $exception;
} finally {
$messageList = [];

if ($consumer->isPropagationStopped()) {
$this->logger->info('Consumer has been propagation stopped forcibly');

exit(0);
}
}
}

return self::SUCCESS;
/**
* @param AMQPMessage[] $messageList
* @param int[] $deliveryTagList
*
* @return array<int, array<int, AMQPMessage>>
*/
private function getPartialMessageList(array $messageList, array $deliveryTagList): array
{
$deliveryTagList = array_flip($deliveryTagList);

$intersectMessageList = array_intersect_key($messageList, $deliveryTagList);
$diffMessageList = array_diff_key($messageList, $deliveryTagList);

return [$intersectMessageList, $diffMessageList];
}

private function getIdleTimeout(): int
{
return $this->parameterBag->get('wakeapp_rabbit_queue.consumer.idle_timeout');
}

private function getWaitTimeout(): int
{
return $this->parameterBag->get('wakeapp_rabbit_queue.consumer.wait_timeout');
}
}
26 changes: 25 additions & 1 deletion Command/UpdateDefinitionCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

namespace Wakeapp\Bundle\RabbitQueueBundle\Command;

use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum;

class UpdateDefinitionCommand extends Command
{
Expand Down Expand Up @@ -56,8 +59,29 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
foreach ($this->definitionList as $definition) {
$definition->init($this->connection);

$this->bindRetryExchange($definition);
}

return self::SUCCESS;
}

private function bindRetryExchange(DefinitionInterface $definition): void
{
$queueName = $definition::getQueueName();
$channel = $this->connection->channel();

$channel->exchange_declare(
ExchangeEnum::RETRY_EXCHANGE,
'x-delayed-message',
false,
true,
false,
false,
false,
new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT])
);

$channel->queue_bind($queueName, ExchangeEnum::RETRY_EXCHANGE, $queueName);
}
}
11 changes: 11 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ public function getConfigTreeBuilder(): TreeBuilder
->end()
->end()
->end()
->arrayNode('consumer')
->addDefaultsIfNotSet()
->children()
->integerNode('idle_timeout')
->defaultValue(0)
->end()
->integerNode('wait_timeout')
->defaultValue(3)
->end()
->end()
->end()
->end()
;

Expand Down
Loading

0 comments on commit 91b3741

Please sign in to comment.