Skip to content

Commit

Permalink
Allows to register handlers on a specific transport (and get rid of t…
Browse files Browse the repository at this point in the history
…his handler alias)
  • Loading branch information
sroze committed Apr 28, 2019
1 parent 96a7907 commit f0b2acd
Show file tree
Hide file tree
Showing 23 changed files with 426 additions and 209 deletions.
6 changes: 6 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ CHANGELOG
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
* [BC BREAK] A `HandlerFailedException` exception will be thrown if one or more handler fails.
* [BC BREAK] The `HandlersLocationInterface::getHandlers` method needs to return `HandlerDescriptor`
instances instead of callables.
* [BC BREAK] The `HandledStamp` stamp has changed: `handlerAlias` has been renamed to `handlerName`,
`getCallableName` has been removed and its constructor only has 2 arguments now.
* [BC BREAK] The `ReceivedStamp` needs to exposes the name of the transport from which the message
has been received.

4.2.0
-----
Expand Down
18 changes: 17 additions & 1 deletion src/Symfony/Component/Messenger/Command/DebugCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
foreach ($handlersByMessage as $message => $handlers) {
$tableRows[] = [sprintf('<fg=cyan>%s</fg=cyan>', $message)];
foreach ($handlers as $handler) {
$tableRows[] = [sprintf(' handled by <info>%s</>', $handler)];
$tableRows[] = [
sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
];
}
}

Expand All @@ -97,4 +99,18 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
}
}

private function formatConditions(array $options): string
{
if (!$options) {
return '';
}

$optionsMapping = [];
foreach ($options as $key => $value) {
$optionsMapping[] = ' '.$key.'='.$value;
}

return ' (when'.implode(', ', $optionsMapping).')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\TraceableMessageBus;
Expand Down Expand Up @@ -94,32 +95,33 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
$message = null;
$handlerBuses = (array) ($tag['bus'] ?? $busIds);

foreach ($handles as $message => $method) {
foreach ($handles as $message => $options) {
$buses = $handlerBuses;

if (\is_int($message)) {
$message = $method;
$method = '__invoke';
if (\is_string($options)) {
$message = $options;
$options = [];
} else {
throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', \gettype($options), $message, $serviceId));
}
}

if (\is_array($message)) {
list($message, $priority) = $message;
} else {
$priority = $tag['priority'] ?? 0;
if (\is_string($options)) {
$options = ['method' => $options];
}

if (\is_array($method)) {
if (isset($method['bus'])) {
if (!\in_array($method['bus'], $busIds)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
$priority = $tag['priority'] ?? $options['priority'] ?? 0;
$method = $options['method'] ?? '__invoke';

throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $method['bus']));
}
if (isset($options['bus'])) {
if (!\in_array($options['bus'], $busIds)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));

$buses = [$method['bus']];
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
}

$priority = $method['priority'] ?? $priority;
$method = $method['method'] ?? '__invoke';
$buses = [$options['bus']];
}

if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
Expand All @@ -141,7 +143,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
}

foreach ($buses as $handlerBus) {
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = $definitionId;
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options];
}
}

Expand All @@ -154,15 +156,20 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
foreach ($handlersByMessage as $message => $handlersByPriority) {
krsort($handlersByPriority);
$handlersByBusAndMessage[$bus][$message] = array_unique(array_merge(...$handlersByPriority));
$handlersByBusAndMessage[$bus][$message] = array_merge(...$handlersByPriority);
}
}

$handlersLocatorMappingByBus = [];
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
foreach ($handlersByMessage as $message => $handlerIds) {
$handlers = array_map(function (string $handlerId) { return new Reference($handlerId); }, $handlerIds);
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlers);
foreach ($handlersByMessage as $message => $handlers) {
$handlerDescriptors = [];
foreach ($handlers as $handler) {
$definitions[$definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]]);
$handlerDescriptors[] = new Reference($definitionId);
}

$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlerDescriptors);
}
}
$container->addDefinitions($definitions);
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/HandleTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private function handle($message)

if (\count($handledStamps) > 1) {
$handlers = implode(', ', array_map(function (HandledStamp $stamp): string {
return sprintf('"%s"', $stamp->getHandlerAlias() ?? $stamp->getCallableName());
return sprintf('"%s"', $stamp->getHandlerName());
}, $handledStamps));

throw new LogicException(sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__, \count($handledStamps), $handlers));
Expand Down
82 changes: 82 additions & 0 deletions src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Handler;

/**
* Describes a handler and the possible associated options, such as `from_transport`, `bus`, etc.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.3
*/
final class HandlerDescriptor
{
private $handler;
private $options;

public function __construct(callable $handler, array $options = [])
{
$this->handler = $handler;
$this->options = $options;
}

public function getHandler(): callable
{
return $this->handler;
}

public function getName(): string
{
$name = $this->callableName($this->handler);
$alias = $this->options['alias'] ?? null;

if (null !== $alias) {
$name .= '@'.$alias;
}

return $name;
}

public function getOption(string $option)
{
return $this->options[$option] ?? null;
}

private function callableName(callable $handler)
{
if (\is_array($handler)) {
if (\is_object($handler[0])) {
return \get_class($handler[0]).'::'.$handler[1];
}

return $handler[0].'::'.$handler[1];
}

if (\is_string($handler)) {
return $handler;
}

if ($handler instanceof \Closure) {
$r = new \ReflectionFunction($handler);
if (false !== strpos($r->name, '{closure}')) {
return 'Closure';
}
if ($class = $r->getClosureScopeClass()) {
return $class->name.'::'.$r->name;
}

return $r->name;
}

return \get_class($handler).'::__invoke';
}
}
36 changes: 32 additions & 4 deletions src/Symfony/Component/Messenger/Handler/HandlersLocator.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
namespace Symfony\Component\Messenger\Handler;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;

/**
* Maps a message to a list of handlers.
*
* @author Nicolas Grekas <p@tchwork.com>
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.2
*/
Expand All @@ -25,7 +27,7 @@ class HandlersLocator implements HandlersLocatorInterface
private $handlers;

/**
* @param callable[][] $handlers
* @param HandlerDescriptor[][]|callable[][] $handlers
*/
public function __construct(array $handlers)
{
Expand All @@ -40,10 +42,23 @@ public function getHandlers(Envelope $envelope): iterable
$seen = [];

foreach (self::listTypes($envelope) as $type) {
foreach ($this->handlers[$type] ?? [] as $alias => $handler) {
if (!\in_array($handler, $seen, true)) {
yield $alias => $seen[] = $handler;
foreach ($this->handlers[$type] ?? [] as $handlerDescriptor) {
if (\is_callable($handlerDescriptor)) {
$handlerDescriptor = new HandlerDescriptor($handlerDescriptor);
}

if (!$this->shouldHandle($envelope, $handlerDescriptor)) {
continue;
}

$name = $handlerDescriptor->getName();
if (\in_array($name, $seen)) {
continue;
}

$seen[] = $name;

yield $handlerDescriptor;
}
}
}
Expand All @@ -60,4 +75,17 @@ public static function listTypes(Envelope $envelope): array
+ class_implements($class)
+ ['*' => '*'];
}

private function shouldHandle(Envelope $envelope, HandlerDescriptor $handlerDescriptor)
{
if (null === $received = $envelope->last(ReceivedStamp::class)) {
return true;
}

if (null === $expectedTransport = $handlerDescriptor->getOption('from_transport')) {
return true;
}

return $received->getTransportName() === $expectedTransport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ interface HandlersLocatorInterface
/**
* Returns the handlers for the given message name.
*
* @return iterable|callable[] Indexed by handler alias if available
* @return iterable|HandlerDescriptor[] Indexed by handler alias if available
*/
public function getHandlers(Envelope $envelope): iterable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

Expand Down Expand Up @@ -54,17 +55,16 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
];

$exceptions = [];
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
$alias = \is_string($alias) ? $alias : null;

if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
continue;
}

try {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
$handler = $handlerDescriptor->getHandler();
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getHandlerName()]);
} catch (\Throwable $e) {
$exceptions[] = $e;
}
Expand All @@ -85,12 +85,11 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
return $stack->next()->handle($envelope, $stack);
}

private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
{
$some = array_filter($envelope
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
$stamp->getHandlerAlias() === $alias;
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
return $stamp->getHandlerName() === $handlerDescriptor->getName();
});

return \count($some) > 0;
Expand Down
Loading

0 comments on commit f0b2acd

Please sign in to comment.