Skip to content

Commit

Permalink
feature #43354 [Messenger] allow processing messages in batches (nico…
Browse files Browse the repository at this point in the history
…las-grekas)

This PR was merged into the 5.4 branch.

Discussion
----------

[Messenger] allow processing messages in batches

| Q             | A
| ------------- | ---
| Branch?       | 5.4
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | #36910
| License       | MIT
| Doc PR        | -

This replaces #42873 as it proposes an alternative approach to handling messages in batch.

`BatchHandlerInterface` says it all: if a handler implements this interface, then it should expect a new `$ack` optional argument to be provided when `__invoke()` is called. When `$ack` is not provided, `__invoke()` is expected to handle the message synchronously as usual. But when `$ack` is provided, `__invoke()` is expected to buffer the message and its `$ack` function, and to return the number of pending messages in the batch.

Batch handlers are responsible for deciding when they flush their buffers, calling the `$ack` functions while doing so.

Best reviewed [ignoring whitespaces](https://github.com/symfony/symfony/pull/43354/files?w=1).

Here is what a batch handler might look like:

```php
class MyBatchHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __invoke(MyMessage $message, Acknowledger $ack = null)
    {
        return $this->handle($message, $ack);
    }

    private function process(array $jobs): void
    {
        foreach ($jobs as [$job, $ack]) {
            try {
                // [...] compute $result from $job
                $ack->ack($result);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }
}
```

By default, `$jobs` contains the messages to handle, but it can be anything as returned by `BatchHandlerTrait::schedule()` (eg a Symfony HttpClient response derived from the message, a promise, etc.).

The size of the batch is controlled by `BatchHandlerTrait::shouldProcess()` (defaults to 10).

The transport is acknowledged in batch, *after* the bus returned from dispatching (unlike what is done in #42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully.

By default, pending batches are flushed when the worker is idle and when it stops.

Commits
-------

81e52b2 [Messenger] allow processing messages in batches
  • Loading branch information
fabpot committed Oct 30, 2021
2 parents 39123f0 + 81e52b2 commit 2d817e1
Show file tree
Hide file tree
Showing 14 changed files with 758 additions and 75 deletions.
Expand Up @@ -215,7 +215,8 @@ public function testItReceivesSignals()
with stamps: [
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
]
Done.
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -5,6 +5,7 @@ CHANGELOG
---

* Add `AsMessageHandler` attribute for declaring message handlers on PHP 8.
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
* Add support for resetting container services after each messenger message.
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
Expand Down
83 changes: 83 additions & 0 deletions src/Symfony/Component/Messenger/Handler/Acknowledger.php
@@ -0,0 +1,83 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Handler;

use Symfony\Component\Messenger\Exception\LogicException;

/**
* @author Nicolas Grekas <p@tchwork.com>
*/
class Acknowledger
{
private $handlerClass;
private $ack;
private $error = null;
private $result = null;

/**
* @param null|\Closure(\Throwable|null, mixed):void $ack
*/
public function __construct(string $handlerClass, \Closure $ack = null)
{
$this->handlerClass = $handlerClass;
$this->ack = $ack ?? static function () {};
}

/**
* @param mixed $result
*/
public function ack($result = null): void
{
$this->doAck(null, $result);
}

public function nack(\Throwable $error): void
{
$this->doAck($error);
}

public function getError(): ?\Throwable
{
return $this->error;
}

/**
* @return mixed
*/
public function getResult()
{
return $this->result;
}

public function isAcknowledged(): bool
{
return null === $this->ack;
}

public function __destruct()
{
if ($this->ack instanceof \Closure) {
throw new LogicException(sprintf('The acknowledger was not called by the "%s" batch handler.', $this->handlerClass));
}
}

private function doAck(\Throwable $e = null, $result = null): void
{
if (!$ack = $this->ack) {
throw new LogicException(sprintf('The acknowledger cannot be called twice by the "%s" batch handler.', $this->handlerClass));
}
$this->ack = null;
$this->error = $e;
$this->result = $result;
$ack($e, $result);
}
}
34 changes: 34 additions & 0 deletions src/Symfony/Component/Messenger/Handler/BatchHandlerInterface.php
@@ -0,0 +1,34 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Handler;

/**
* @author Nicolas Grekas <p@tchwork.com>
*/
interface BatchHandlerInterface
{
/**
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
* The message should be handled synchronously when null.
*
* @return mixed The number of pending messages in the batch if $ack is not null,
* the result from handling the message otherwise
*/
//public function __invoke(object $message, Acknowledger $ack = null): mixed;

/**
* Flushes any pending buffers.
*
* @param bool $force Whether flushing is required; it can be skipped if not
*/
public function flush(bool $force): void;
}
75 changes: 75 additions & 0 deletions src/Symfony/Component/Messenger/Handler/BatchHandlerTrait.php
@@ -0,0 +1,75 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Handler;

use Symfony\Component\Messenger\Exception\LogicException;

/**
* @author Nicolas Grekas <p@tchwork.com>
*/
trait BatchHandlerTrait
{
private $jobs = [];

/**
* {@inheritdoc}
*/
public function flush(bool $force): void
{
if ($jobs = $this->jobs) {
$this->jobs = [];
$this->process($jobs);
}
}

/**
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
* The message should be handled synchronously when null.
*
* @return mixed The number of pending messages in the batch if $ack is not null,
* the result from handling the message otherwise
*/
private function handle(object $message, ?Acknowledger $ack)
{
if (null === $ack) {
$ack = new Acknowledger(get_debug_type($this));
$this->jobs[] = [$message, $ack];
$this->flush(true);

return $ack->getResult();
}

$this->jobs[] = [$message, $ack];
if (!$this->shouldFlush()) {
return \count($this->jobs);
}

$this->flush(true);

return 0;
}

private function shouldFlush(): bool
{
return 10 <= \count($this->jobs);
}

/**
* Completes the jobs in the list.
*
* @list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
*/
private function process(array $jobs): void
{
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
}
}
56 changes: 27 additions & 29 deletions src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php
Expand Up @@ -19,12 +19,34 @@
final class HandlerDescriptor
{
private $handler;
private $name;
private $batchHandler;
private $options;

public function __construct(callable $handler, array $options = [])
{
if (!$handler instanceof \Closure) {
$handler = \Closure::fromCallable($handler);
}

$this->handler = $handler;
$this->options = $options;

$r = new \ReflectionFunction($handler);

if (str_contains($r->name, '{closure}')) {
$this->name = 'Closure';
} elseif (!$handler = $r->getClosureThis()) {
$class = $r->getClosureScopeClass();

$this->name = ($class ? $class->name.'::' : '').$r->name;
} else {
if ($handler instanceof BatchHandlerInterface) {
$this->batchHandler = $handler;
}

$this->name = \get_class($handler).'::'.$r->name;
}
}

public function getHandler(): callable
Expand All @@ -34,7 +56,7 @@ public function getHandler(): callable

public function getName(): string
{
$name = $this->callableName($this->handler);
$name = $this->name;
$alias = $this->options['alias'] ?? null;

if (null !== $alias) {
Expand All @@ -44,37 +66,13 @@ public function getName(): string
return $name;
}

public function getOption(string $option)
public function getBatchHandler(): ?BatchHandlerInterface
{
return $this->options[$option] ?? null;
return $this->batchHandler;
}

private function callableName(callable $handler): string
public function getOption(string $option)
{
if (\is_array($handler)) {
if (\is_object($handler[0])) {
return \get_class($handler[0]).'::'.$handler[1];
}

return $handler[0].'::'.$handler[1];
}

if (\is_string($handler)) {
return $handler;
}

if ($handler instanceof \Closure) {
$r = new \ReflectionFunction($handler);
if (str_contains($r->name, '{closure}')) {
return 'Closure';
}
if ($class = $r->getClosureScopeClass()) {
return $class->name.'::'.$r->name;
}

return $r->name;
}

return \get_class($handler).'::__invoke';
return $this->options[$option] ?? null;
}
}
Expand Up @@ -15,10 +15,15 @@
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\AckStamp;
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand Down Expand Up @@ -60,14 +65,58 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

try {
$handler = $handlerDescriptor->getHandler();
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
$batchHandler = $handlerDescriptor->getBatchHandler();

/** @var AckStamp $ackStamp */
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
$ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
if (null !== $e) {
$e = new HandlerFailedException($envelope, [$e]);
} else {
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
}

$ackStamp->ack($envelope, $e);
});

$result = $handler($message, $ack);

if (!\is_int($result) || 0 > $result) {
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
}

if (!$ack->isAcknowledged()) {
$envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor));
} elseif ($ack->getError()) {
throw $ack->getError();
} else {
$result = $ack->getResult();
}
} else {
$result = $handler($message);
}

$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $result);
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
} catch (\Throwable $e) {
$exceptions[] = $e;
}
}

/** @var FlushBatchHandlersStamp $flushStamp */
if ($flushStamp = $envelope->last(FlushBatchHandlersStamp::class)) {
/** @var NoAutoAckStamp $stamp */
foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
try {
$handler = $stamp->getHandlerDescriptor()->getBatchHandler();
$handler->flush($flushStamp->force());
} catch (\Throwable $e) {
$exceptions[] = $e;
}
}
}

if (null === $handler) {
if (!$this->allowNoHandlers) {
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
Expand All @@ -85,11 +134,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
{
$some = array_filter($envelope
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
return $stamp->getHandlerName() === $handlerDescriptor->getName();
});
/** @var HandledStamp $stamp */
foreach ($envelope->all(HandledStamp::class) as $stamp) {
if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
return true;
}
}

return \count($some) > 0;
return false;
}
}

0 comments on commit 2d817e1

Please sign in to comment.