New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Add a new Messenger component #24411

Closed
wants to merge 2 commits into
base: master
from

Conversation

@sroze
Member

sroze commented Oct 3, 2017

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets #24326
License MIT
Doc PR symfony/symfony-docs#9437

As discussed in #24326. This PR is to help going forward with the discussions of having a Message component.

Resources

What Where
Documentation In the PR
Demo In sroze/symfony-demo:message-component-demo
php-enqueue adapter 1. Source: In sroze/enqueue-bridge (to be moved as symfony/enqueue-bridge I guess)
2. Demo: In sroze/symfony-demo:message-component-demo-with-enqueue
Swarrot adapter Outdated adapter, waiting for stabilization 1. Source: In sroze/swarrot-bridge (to be moved as symfony/swarrot-bridge I guess)
2. Demo: In sroze/symfony-demo:message-component-demo-with-swarrot
HTTP adapter Outdated adapter, waiting for stabilization 1. Source: In sroze/message-http-adapter
2. Demo: In sroze/symfony-demo:message-component-demo-with-http-adapter
Web profiler integration In the pull-request

Important points

  1. Tests are not in the PR as they were written in PhpSpec & Behat. If we decide to go forward with this approach, I'll translate them to PHPUnit.
  2. The aim is not to solve all the message/queuing problems but provide a good, simple and extensible message bus for developers.
  3. The communication with the actual AMQP/API brokers is down to the adapters for now. Not sure if we need to ship some by default or not 馃

I guess that this would replace #23842 & #23315.

Changes from the proposals

Based on the comments, a few changes have been made from the proposal.

  1. MessageProducers have been renamed to MessageSenders
  2. MessageConsumers have been renamed to MessageReceivers
*/
public function handle($message)
{
call_user_func($this->callableForNextMiddleware(0), $message);

This comment has been minimized.

@mvrhov

mvrhov Oct 6, 2017

Contributor

return missing

@everzet

I'd work on naming a bit, Sam :)

Otherwise - 馃憤

*/
private $producerForMessageResolver;
public function __construct(ProducerForMessageResolverInterface $producerForMessageResolver)

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

MessageSenderResolverInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class SendMessageToProducersMiddleware implements MessageBusMiddlewareInterface

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

MessageSendingMiddleware

{
if ($message instanceof ConsumedMessage) {
$message = $message->getMessage();
} elseif (!empty($producers = $this->producerForMessageResolver->getProducersForMessage($message))) {

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

getProducersForMessage() => getSender()

This comment has been minimized.

@sroze

sroze Oct 10, 2017

Member

As I guess we'll rename Producer to Sender I see where you are going. But you replace plural by singular here and that's an interesting place for me to explain why it's plural.

At the beginning it was returning ProducerInterface|null and I had a collection of producer that was dispatching to a set of producers to handle this plural. But there's one use-case that didn't work:

When I dispatch a message "Foo" in the message bus
Then I want the message to be sent to the producer "FooProducer"
And I want the message to be handled by the handler

i.e. I want to send the message to something and I want to call the local handler. This can be useful for audit purposes, duplication, etc... To make this use case happening, you just need to have to use a null handler. (See the note in `README.md)

Therefore, I'd say getProducersForMessage() => getSenders()

*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class CollectionOfMessageHandlers

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

MessageHandlers or MessageHandlerCollection for consistency with other names

This comment has been minimized.

@sroze

sroze Oct 10, 2017

Member

Let's go for MessageHandlerCollection.

* @author Samuel Roze <samuel.roze@gmail.com>
* @author Matthias Noback <matthiasnoback@gmail.com>
*/
class MessageBus implements MessageBusInterface

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

MiddleawareMessageBus :D

This comment has been minimized.

@bitgandtter

bitgandtter Nov 25, 2017

if we took that the interface is called MessageBusMiddlewareInterface then it should be MessageBusMiddleware which is more accurate

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface MessageConsumerInterface

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

ConsumerInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessageHandlerResolver implements MessageHandlerResolverInterface

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

MappedHandlerResolver

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface MessageHandlerResolverInterface

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

HandlerResolverInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface MessageProducerInterface

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

ProducerInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class CallMessageHandlerMiddleware implements MessageBusMiddlewareInterface

This comment has been minimized.

@everzet

everzet Oct 9, 2017

Contributor

HandlerMiddleware

protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ContainerInterface $container */
$container = $this->getApplication()->getKernel()->getContainer();

This comment has been minimized.

@Koc

Koc Oct 9, 2017

Contributor

Can we use service locators instead of container?

This comment has been minimized.

@sroze

sroze Dec 1, 2017

Member

@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)

This comment has been minimized.

@mvrhov

mvrhov Dec 2, 2017

Contributor

Require tags on consumers

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 馃憤

This comment has been minimized.

@sroze

sroze Jan 28, 2018

Member

Actually, I don't see a real point of using service locators in here. It forces us to create a new compiler pass and have another set of tags... while just using the container (as in many other commands in the FrameworkBundle) is the simplest option.

This comment has been minimized.

@dunglas

dunglas Mar 12, 2018

Member

It would be nice to promote the good practice of not injecting the container in our own code. A service locator + autoconfiguration looks better to me, especially because you have a ReceiverInterface.

This comment has been minimized.

@sroze

sroze Mar 13, 2018

Member

Auto-configuration would be nice if we also define a generic folder (such as src/MessageReceiver). Without, it feels a bit too much work for not much benefits, as I explained in the previous comment. But obviously, if the general feedback is that we need this, we can add it (even later hehe :))

This comment has been minimized.

@chalasr

chalasr Mar 13, 2018

Member

Fetching the receiver from the container has one big drawback: it must be public. I would say go for an autoconfigured tag + scoped receiver locator

This comment has been minimized.

@sroze

sroze Mar 13, 2018

Member

Fair enough, updated to use service locators 馃憤

*
* @param object $message
*
* @return mixed

This comment has been minimized.

@arnolanglade

arnolanglade Oct 10, 2017

Normally a command bus should return void, right?

This comment has been minimized.

@sroze

sroze Oct 10, 2017

Member

And that's the big point here: that's not "just" a command bus but it is a message bus. This can also be used for queries in a CQRS-type architecture.

Note that if you want to enforce such limitation for commands, you can very easily create your own instance of the MessageBus and have a special "noop returns" (or "EnforceCommandsDoNotReturnAnything") middleware that would force return null for example.

This comment has been minimized.

@arnolanglade

arnolanglade Oct 10, 2017

Ok, thank for explanations.

*/
interface MessageConsumerInterface
{
public function consume(): \Generator;

This comment has been minimized.

@arnolanglade

arnolanglade Oct 10, 2017

If I understand this method creates a collection of messages? So you don't consume messages but you create them, right? Perhaps this is not the good wording. Plus, it is strange to use a verb for a method which builds something it is better to use a word like messages()

This comment has been minimized.

@sroze

sroze Oct 10, 2017

Member

I'm really not found of the messages() suggestion but agree the naming can be better. What about FetcherInterface that has a fetch(): \Generator method? Because these things are fetching messages from various sources...

This comment has been minimized.

@arnolanglade

arnolanglade Oct 10, 2017

Or FetchedMessage::messages() :) but Fetcher::fetch() is more clear than the previous proposal.

*
* @param object $message
*/
public function produce($message);

This comment has been minimized.

@arnolanglade

This comment has been minimized.

@sroze

sroze Oct 10, 2017

Member

I'm not sure we should enforce this for now and leave the door open for results. i.e. I know that some brokers like Google Pub/Sub can return things like the "message ID" that is quite useful for logging/debugging purposes sometimes - so I can expect to see people logging these.

This comment has been minimized.

@mvrhov

mvrhov Nov 25, 2017

Contributor

Or one can pass this directly to the e.g frontend, for it to check periodically if this is already processed. and you don't need to predefine the message id and store it to application db.

3. `CallMessageHandlerMiddleware` (call the registered handle)
```php
$result = $this->get('message_bus')->dispatch(new MyMessage(/* ... */));

This comment has been minimized.

@arnolanglade

arnolanglade Oct 10, 2017

I think the bus handles message: dispatch -> handle

This comment has been minimized.

@sroze

sroze Oct 10, 2017

Member

I'm not against this change; waiting to see more opinions on this :)

return $this->messageToProducerMapping['*'];
}
return array();

This comment has been minimized.

@arnolanglade
$handlerResolver->replaceArgument(0, $this->findHandlers($container));
}
private function findHandlers(ContainerBuilder $container)

This comment has been minimized.

@arnolanglade

arnolanglade Oct 10, 2017

missing return type hint

*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface ExceptionInterface

This comment has been minimized.

@arnolanglade

arnolanglade Oct 10, 2017

is there any value to add an empty interface?

This comment has been minimized.

@sroze

sroze Oct 10, 2017

Member

That's a good question; but it's a constant in every Symfony Component so I didn't want to start questioning this one 馃槈

This comment has been minimized.

@afurculita

afurculita Oct 10, 2017

Contributor

Using this interface, allows you to catch all the exceptions that come from this component.

This comment has been minimized.

@arnolanglade
@mvrhov

This comment has been minimized.

Contributor

mvrhov commented Dec 1, 2017

Now that 4.0 is out the door, can we get some more input from the symfony core team.
Ping @symfony/deciders

/**
* {@inheritdoc}
*/
public function getProducersForMessage($message): array

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 1, 2017

Member

could this method be refactored as follows?

return $this->messageToProducerMapping[get_class($message)] ?? $this->messageToProducerMapping['*'] ?? array();

This comment has been minimized.

@sroze

sroze Dec 1, 2017

Member

Good point 馃憤

private $messageBusService;
private $middlewareTag;
public function __construct(

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 1, 2017

Member

In Symfony, the arguments are always put on the same line, no matter how long they are.

{
$handlersByMessage = array();
foreach ($container->findTaggedServiceIds($this->handlerTag, true) as $serviceId => $tags) {

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 1, 2017

Member

Can't we use a SplPriorityQueue here instead of manually dealing with priorities? We use it here for example:

@sroze

I'll rename the following things in this pull-request, following a number of valid comments regarding the names.

  1. Producer -> Sender
  2. Consumer -> Receiver
protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ContainerInterface $container */
$container = $this->getApplication()->getKernel()->getContainer();

This comment has been minimized.

@sroze

sroze Dec 1, 2017

Member

@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)

@Koc

This comment has been minimized.

Contributor

Koc commented Dec 2, 2017

@sroze can we tag all consumers with tag queue.consumer? Even reference tagged services

@mvrhov

This comment has been minimized.

Contributor

mvrhov commented Dec 2, 2017

message.consumer or message.handler? would be better.

@Koc

This comment has been minimized.

Contributor

Koc commented Dec 2, 2017

It depends on accepted terminology. Maybe even without dot separatorm like already proposed in this PR message_middleware tag.

@sroze

This comment has been minimized.

Member

sroze commented Dec 3, 2017

@Koc do you mean the message handlers or message receivers? Following the reviews, I had to rename them. Check out the concepts documentation to clarify.

@mvrhov

This comment has been minimized.

Contributor

mvrhov commented Dec 23, 2017

I've converted a small project from SimpleBus to Messaging component.
I had to change the following:

  • namespace from SimpleBus to Symfony
  • rename the method from handle to dispatch I'm still not fond of having dispatch instead of handle, but I understand the reasoning. However this will have to be explained to people.
  • change tag from command_handler to message_handler
  • change namespace from *\Command\* to *\Message\*
  • inject message_bus instead of command_bus
  • Use MessageBus instead of CommandBus

I still cannot get rid of the SimpleBus as this is missing the EventRecorder and a EventBus.
More to follow

{
return array_map(function ($handler) use ($message) {
return $handler($message);
}, $this->handlers);

This comment has been minimized.

@Taluu

Taluu Dec 24, 2017

Contributor

wouldn't an array_walk be better ? Don't see why we would need to fetch the return values of each handlers..

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

As discussed here I think we should have all we need to get the result of handlers, to support queries. Therefore, array_map makes sense here :)

@ro0NL

And a 馃憤 from me :)

<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true">
<argument type="collection" /> <!-- Middlewares -->

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

can it be type tagged? :)

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Yep, I think so 馃憤

class SendMessageMiddleware implements MiddlewareInterface
{
/**
* @var SenderLocatorInterface

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

obvious IMHO

*/
public function handle($message, callable $next)
{
$this->logger->debug('Starting processing message', array(

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

... message "'.get_class($message).'"'? (as for logs below).

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

I don't think we should have that as part of the log message (as it's in the context) but happy to change if more people believe it should be the case.

This comment has been minimized.

@ro0NL

ro0NL Dec 25, 2017

Contributor

it would make identifying messages a bit easier (i.e. in the web profiler log panel, which would boldify the message class :))

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Only if the message is Starting processing {class} message and we add class as context I believe. Which... makes sense actually 馃憤

try {
$result = $next($message);
} catch (\Throwable $e) {
$this->logger->warning('Something went wrong while processing message', array(

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

An exception occured while ...?

try {
$method = $reflection->getMethod('__invoke');
} catch (\ReflectionException $e) {

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

just use hasMethod?

This comment has been minimized.

@mvrhov

mvrhov Dec 24, 2017

Contributor

getMethod has to be called nonetheless. As we need the access to the method parameters. Do we really need to call 2 methods instead of just catching the exception in those rare cases.

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

Do we really need to call 2 methods instead of just catching the exception

For readability / less LoC, i'd say yes. No real issue i guess.. just looks like an unneeded micro-optim.

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Note that we should also support having the handles attribute on the tag. In case a handler is handling multiple commands/queries and/or the typehint is not present, we need to be able to specify the handled message.

*/
public function __construct(array $middlewares = array())
{
$this->middlewares = $middlewares;

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

to leverage tagged (which implies iterable type), what about;

$this->middlewares  = is_array($middlewares) ? array_values($middlewares) : iterator_to_array($middlewares, false);

or find a lazy way?

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Yup, good point. Why would you array_values if array?

This comment has been minimized.

@ro0NL

ro0NL Dec 25, 2017

Contributor

to re-index middlewares, ensuring callableForNextMiddleware implem always works. (needs an int $index type btw :))

*/
private $serializer;
public function __construct(SerializerInterface $serializer)

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

, $format = 'json?

*
* @param object $message
*
* @return array

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

What about Transport\EncodedMessage?

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

What would be the value?

This comment has been minimized.

@ro0NL

ro0NL Dec 25, 2017

Contributor

type info :) i.e. not having to rely on such a comment;

* The most common keys of the encoded array are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers

common means required?

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

But much less flexibility for use-cases we don't know about yet. That's for the same reason that the serializer's context is an array.

This comment has been minimized.

@Nyholm

Nyholm Jan 7, 2018

Member

I've been back and forth on this.
Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json. Which means that we should not really care. =)

Except for the SymfonySerialization that ships with the component, any implementation of EncoderInterface and DecoderInterface will only be used inside the concept of a "my-queue-bridge". Which leads to the question, do we need these interfaces?

This comment has been minimized.

@sroze

sroze Jan 7, 2018

Member

Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json.

I've read it this way:

Encoders are never used in Symfony (message component) code. They are only used by adapters to transform a PHP object to a PHP array. Adapters will then transform the PHP array to their own transport layer (can be JSON, can be whatever...)

Which means that we should not really care. =)

We, as users, that's correct, we shouldn't care at all about these technical details when dispatching messages to the bus.

Any implementation of EncoderInterface and DecoderInterface will only be used inside the concept of a "my-queue-bridge".

I don't think so, that's why I've put these interfaces within the component. There are multiple use-cases that are not adapter-specific but might provider encoder/decoders:

  • When dispatching messages, it's pretty useful to be able to trace them. To get things like OpenTracing or Zipkin to work well, all we need is "just" proper headers on the messages (via AMQP, HTTP, whatever). So a symfony-message-zipkin package could offer encoder/decoders decorators to populate and read these headers, regardless of the transport.
  • It might be useful for some users/companies to add their own routing specific keys (in body, header or anything else) within the message before it being sent to either of the senders (which could be different adapters). Such encoder/decoder would help them here.

Which leads to the question, do we need these interfaces?

The reasons given above expresses my answer which is: I believe we need them. Also, it reduces the duplication within the various adapters and reduces the overhead of creating one.

#### Same bus received and sender
To allow us to receive and send messages on the same bus and prevent a loop, the message bus is equipped with the
`WrappedIntoReceivedMessage` received. It will wraps the received messages into `ReceivedMessage` objects and the

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

WrapIntoReceivedMessage + It will wrap the..

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Yep, good point 馃憤

*/
interface ReceiverInterface
{
public function receive(): \Generator;

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

iterable?

This comment has been minimized.

@sroze

sroze Mar 13, 2018

Member

This has been done then reverted to \Generator for the reasons described in this comment.

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class HandlerLocator implements HandlerLocatorInterface

This comment has been minimized.

@mvrhov

mvrhov Dec 24, 2017

Contributor

@sroze: HandlerLocator should be able to lazy load the handlers. Now the question is, can I change this class to have ServiceLocator injected into the constructor, or should a new class be created and set as default in framework bundle.

This comment has been minimized.

@ro0NL

ro0NL Dec 24, 2017

Contributor

+1 for a PSR-11 implem. (IMHO fits core/component)

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

@mvrhov did the work in sroze#3 and it has been merged within this PR 馃憦

{
$this
->setDefinition(array(
new InputArgument('consumer', InputArgument::REQUIRED, 'Name of the consumer'),

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

We renamed "consumer" to "receiver": this needs to reflect the change as well.

protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ContainerInterface $container */
$container = $this->getApplication()->getKernel()->getContainer();

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 馃憤

<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true">
<argument type="collection" /> <!-- Middlewares -->

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Yep, I think so 馃憤

*/
public function handle($message, callable $next)
{
$this->logger->debug('Starting processing message', array(

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

Only if the message is Starting processing {class} message and we add class as context I believe. Which... makes sense actually 馃憤

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class HandlerLocator implements HandlerLocatorInterface

This comment has been minimized.

@sroze

sroze Dec 25, 2017

Member

@mvrhov did the work in sroze#3 and it has been merged within this PR 馃憦

throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It should implement the interface "%s"', $busName, MessageBusInterface::class));
}
foreach ($receiver->receive() as $message) {

This comment has been minimized.

@makasim

makasim Dec 26, 2017

Contributor

This kind of API wouldn't allow gaining the best possible performance while dealing with AMQP and consuming from several queues at the same time.

This comment has been minimized.

@sroze

sroze Dec 27, 2017

Member

@makasim what is the main blocker in "gaining the best possible performance"? I believe this is up to the adapters anyway? Maybe things we could do on the enqueue adapter for example? 馃

This comment has been minimized.

@makasim

makasim Dec 27, 2017

Contributor

@sroze Is receiver able to consume only from one queue? Should I spawn 100 processes (for each queue) if I have 100 queues?

This comment has been minimized.

@mvrhov

mvrhov Dec 27, 2017

Contributor

Do you in reality have 100 queues? In our biggest project we have 10.

This comment has been minimized.

@sroze

sroze Dec 28, 2017

Member

I don't see why you couldn't pull from multiple queues at the same time from the adapter(s) ? That's the beauty (to me) for such interface based on the generators (here even more generic, iterables): it can come from everywhere and even through react-based receivers I believe.

This comment has been minimized.

@makasim

makasim Jan 7, 2018

Contributor

Do you in reality have 100 queues? In our biggest project we have 10.

  • Let's say we need 50 Mb to boot up an application (load kernel, container etc). So we need 10 processes to serve ten queues, right? You need at least 500Mb to boot up them and then while working they eat more.
  • CPU is used a bit more.
  • In development, it is very convenient to run a single command to consume from all queues.
  • There are high load queues and rarely used ones. You can group rarely used ones to be served by a single command, saved memory could be used to run additional consumers for high load queues.

This comment has been minimized.

@makasim

makasim Jan 7, 2018

Contributor

I don't see why you couldn't pull from multiple queues at the same time from the adapter(s) ?

Let's talk about AMQP. To get the most of it we should use AMQP's basic.consume method.

Here's AMQP interop based example through same is applied to php-amqplib, bunny and amqp-ext.

@sroze How would make the example work with the current receiver interface.

$container = $this->getApplication()->getKernel()->getContainer();
if (!$container->has($receiverName = $input->getArgument('receiver'))) {
throw new \RuntimeException(sprintf('Receiver "%s" do not exists', $receiverName));

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 26, 2017

Member

do not exists -> does not exist

}
if (!$container->has($busName = $input->getOption('bus'))) {
throw new \RuntimeException(sprintf('Bus "%s" do not exists', $busName));

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 26, 2017

Member

do not exists -> does not exist

if (!$container->has($receiverName = $input->getArgument('receiver'))) {
throw new \RuntimeException(sprintf('Receiver "%s" do not exists', $receiverName));
} elseif (!($receiver = $container->get($receiverName)) instanceof ReceiverInterface) {
throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It should implement the interface "%s"', $receiverName, ReceiverInterface::class));

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 26, 2017

Member

It should implement... -> It must implement...

if (!$container->has($busName = $input->getOption('bus'))) {
throw new \RuntimeException(sprintf('Bus "%s" do not exists', $busName));
} elseif (!($messageBus = $container->get($busName)) instanceof MessageBusInterface) {
throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It should implement the interface "%s"', $busName, MessageBusInterface::class));

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 26, 2017

Member

It should implement... -> It must implement...

<service id="message.middleware.call_message_handler" class="Symfony\Component\Message\Middleware\HandleMessageMiddleware">
<argument type="service" id="message.handler_resolver" />
<tag name="message_middleware" priority="-10" />

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 26, 2017

Member

I wonder if message_middleware is the best choice possible for this tag name. Unlike other frameworks, in Symfony we don't use the "middleware" word anywhere (not even in docs).

This comment has been minimized.

@sroze

sroze Dec 27, 2017

Member

Correct, we don't use middlewares yet. But I believe the term starts to be widely known and the good is: they mean the same everywhere :)

<span class="icon">{{ include('@WebProfiler/Icon/messages.svg') }}</span>
<strong>Messages</strong>
{% if collector.messages | length > 0 %}

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 26, 2017

Member

collector.messages | length -> collector.messages|length ... and for other Twig filters too.

if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) {
throw new \InvalidArgumentException('Encoded message should have at least a `body` some `headers`');
} elseif (empty($encodedMessage['headers']['type'])) {
throw new \InvalidArgumentException('Encoded message do not have a `type` header');

This comment has been minimized.

@javiereguiluz

javiereguiluz Dec 26, 2017

Member

do not have -> does not have

*/
private $serviceLocator;
public function __construct(ContainerInterface $serviceLocator)

This comment has been minimized.

@ro0NL

ro0NL Dec 27, 2017

Contributor

SF usually calls this one $container

@mvrhov

This comment has been minimized.

Contributor

mvrhov commented Dec 27, 2017

Should we move this out of the FrameworkBundle (into the MessageBundle? BusBundle?) we are cramming to much into it as it is.
And then also add the ability to easily create more buses.
The framework configuration tree is becoming bloated.

@mvrhov

This comment has been minimized.

Contributor

mvrhov commented Dec 27, 2017

There is another proposal to add event bus ready to discussion at sroze#4

@Nyholm

Great. I like this approach. Maybe it is a bit too abstract. I added a few comments from my first review

### Handlers
Once dispatched to the bus, messages will be handled by a "message handler". A message handler is a PHP callable
(i.e. a function or an instance of a class) that will do the required processing for your message. It _might_ return a

This comment has been minimized.

@Nyholm

Nyholm Dec 27, 2017

Member

I do not think this is a good idea. The two reasons that directly comes to my mind are:

  • Consider CQRS, then this is the "Command" and commands should not return anything.
  • The controller/caller does not know if the message is handled async or not.

This comment has been minimized.

@mvrhov

mvrhov Dec 27, 2017

Contributor

It's not only for CQRS and that' the reason that this is named Message and not a Command.
Also nobody says the Command cannot return something. I like the pattern, but despise using UUIDS for PKs or adding surrogate key just because I'm forced by the system to use UUID

This comment has been minimized.

@sroze

sroze Dec 28, 2017

Member

That also resonate with @aRn0D's question and my previous answer. I think that it's extremely important for us to support that. Multiple reasons:

  1. We can't expect people to know very well that in CQRS commands should not return anything.
  2. The "message bus" is not a command bus. Even in CQRS, there is this "Query" part... having a "query bus" makes sense.
  3. As the approach is generic to messages (see the HTTP adapter from the PR) this can also be applied outside of the CQRS-only pattern. So I can see people using it to abstract their requests to different micro-services for example.

This comment has been minimized.

@makasim

makasim Jan 7, 2018

Contributor

AMQP allows messages to be acknowledged, rejected or re-queued. It is not clear how I can control it.

throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It must implement the interface "%s"', $busName, MessageBusInterface::class));
}
foreach ($receiver->receive() as $message) {

This comment has been minimized.

@Nyholm

Nyholm Dec 27, 2017

Member

There is no ReceiverInterface implementation. Do you allow this to be blocking or not?

This comment has been minimized.

@sroze

sroze Dec 28, 2017

Member

Correct, there is not within the component directly. But there are in the adapters I've listed within the PR's description :)

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessageConsumeCommand extends Command

This comment has been minimized.

@Nyholm

Nyholm Dec 27, 2017

Member

Could we create a Worker/Loop in the component?

This comment has been minimized.

@sroze
@sroze

This comment has been minimized.

Member

sroze commented Dec 28, 2017

Should we move this out of the FrameworkBundle (into the MessageBundle? BusBundle?) we are cramming to much into it as it is.

I am not sure this makes sense (for now at least) to create a dedicated Bundle for this component. FrameworkBundle is already complex and... this makes sense. All we need from the integration with the FrameworkBundle is to be able to create buses and configure the middlewares for each. Not sure we need more.

There is another proposal to add event bus ready to discussion at sroze#4

As we discussed privately already, my position on this point is that we should not bring more than one bus by default. There are obviously many reasons but the main ones are:

  1. Ease the understanding while starting with buses for beginners
  2. Multiple buses would prevent auto wiring to work properly
  3. If the FrameworkBundle integration allows us to create multiple buses, I guess that would be enough for "advanced users" to create different buses with different middlewares

Could we create a Worker/Loop in the component?

@Nyholm Even if this will be almost only a loop over the receiver to dispatch the message, I think that it is valuable. I'll, therefore, extract this from the command to a Worker class within the Message component.

@ro0NL

This comment has been minimized.

Contributor

ro0NL commented Dec 29, 2017

@sroze that means a single dispatch($eventOrCommandMessage) API right? Sounds nice!

I voted 馃憤 for the separate message bundle as i see no real reason to ship a framework bundle with message infrastructure by default.. but i guess we dont mind really (message vs. framework.message),

@ro0NL

This comment has been minimized.

Contributor

ro0NL commented Dec 29, 2017

Another reason a separate bundle might be a good idea is one can have sf/framework-bundle:3.4 + sf/message-bundle:4.1. Instead of forcing framework:4.1 =/

@sroze

This comment has been minimized.

Member

sroze commented Mar 20, 2018

The new components recently introduced have been (if I'm not mistaking) "Lock" (not "Locker"), "WebLink" (not "WebLinker"), "Workflow", "Ldap", "PropertyInfo". The only exception is "VarDumper" :)

While I do like the name "Messenger", I don't think it completely falls in line with the rest of the component names

While I tend to agree, I don't really care either "Message" or "Messenger". But if it's such a hard question to find a name, I'd vote for keeping the existing one because it "works" with existing names and changing it might not bring much value compared to the work required to rename.

@dunglas

This comment has been minimized.

Member

dunglas commented Mar 20, 2018

@ged15

This comment has been minimized.

Contributor

ged15 commented Mar 20, 2018

How about "Messaging"?

@ogizanagi

This comment has been minimized.

Member

ogizanagi commented Mar 20, 2018

I'd also vote for Message for the reasons given by @sroze & @dunglas.

@sroze

This comment has been minimized.

Member

sroze commented Mar 20, 2018

After maaany discussions, especially with @fabpot and @nicolas-grekas, the component is going to be called 芦聽Messenger聽禄.

$handlersLocatorMapping = array();
foreach ($handlersByMessage as $message => $handler) {
$handlersLocatorMapping['handles.'.$message] = $handler;

This comment has been minimized.

@mvrhov

mvrhov Mar 21, 2018

Contributor

Isn't this one supposed to be handler just like in ContainerHandlerLocator.php line 33?
I'm curious why the test didn't catch this

@sroze sroze changed the title from [Message] Add a new Message component to [Messenger] Add a new Messenger component Mar 21, 2018

@pborreli

This comment has been minimized.

Contributor

pborreli commented Mar 21, 2018

@sroze so now you have to rename your conference name at Symfony Live Paris 馃槃

protected function dispatchMessage($message)
{
if (!$this->container->has('message_bus')) {
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/message".');

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

Need to be renamed as well.

@@ -118,6 +119,7 @@ public function build(ContainerBuilder $container)
$container->addCompilerPass(new ResettableServicePass());
$container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32);
$container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING);
$this->addCompilerPassIfExists($container, MessagePass::class);

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

To be renamed to MessengerPass

<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Messenger\MessageBus" public="true">
<argument type="tagged" tag="message_middleware" /> <!-- Middlewares -->

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

To keep explicit, I'll therefore rename to message_bus_middleware.

@@ -249,6 +250,10 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
),
),
),
'message' => array(

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

messenger

interface SenderLocatorInterface
{
/**
* Gets the producer (if applicable) for the given message object.

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

s/producer/sender

try {
$result = $next($message);
} catch (\Throwable $e) {
$this->logger->warning('An exception occurred while processing message {class}', array(

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

Let's keep the same language: s/processing/handling

*/
public function handle($message, callable $next)
{
$this->logger->debug('Starting processing message {class}', array(

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

Let's keep the same language: s/processing/handling

throw $e;
}
$this->logger->debug('Finished processing message {class}', array(

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

Let's keep the same language: s/processing/handling

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessagePass implements CompilerPassInterface

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

Should be MessengerPass

$parameter = $parameters[0];
if (null === $parameter->getClass()) {
throw new RuntimeException(sprintf('The parameter of `__invoke` function of service "%s" must type hint the Message class it handles.', $serviceId));

This comment has been minimized.

@sroze

sroze Mar 21, 2018

Member

Message => message

@sroze

This comment has been minimized.

Member

sroze commented Mar 21, 2018

@pborreli I know 馃槄

@ro0NL

ro0NL approved these changes Mar 21, 2018

@@ -69,6 +69,13 @@
<tag name="console.command" command="debug:event-dispatcher" />
</service>
<service id="console.command.messenger_consume_message" class="Symfony\Bundle\FrameworkBundle\Command\MessengerConsumeMessagesCommand">

This comment has been minimized.

@ro0NL

ro0NL Mar 21, 2018

Contributor

s\messenger_consume_message\messenger_consume_messages

@GawainLynch

So much awesome! Thank you @sroze 馃嵕

</service>
<service id="data_collector.messenger" class="Symfony\Bundle\FrameworkBundle\DataCollector\MessengerDataCollector">
<tag name="data_collector" template="@WebProfiler/Collector/messages.html.twig" id="messenger" priority="100" />

This comment has been minimized.

@ro0NL