New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Add AMQP adapter #26632

Merged
merged 1 commit into from Apr 12, 2018

Conversation

@sroze
Member

sroze commented Mar 22, 2018

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? ø
License MIT
  • Depends on the Messenger component #24411
  • Add tests once we are all happy about the structure

In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the enqueue adapter can also be used.

Configuring the adapter is as simple as the following configuration:

# config/packages/messenger_adapters.yaml
framework:
    messenger:
        adapter: "%env(MESSENGER_DSN)%"

With the given .env for example:

MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages

Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter.

# config/packages/messenger_routes.yaml
framework:
    messenger:
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.default_sender

Additionally, multiple adapters can be created and messages routed to these ones.

# config/packages/messenger_routes.yaml
framework:
    messenger:
        adapters:
            commands: "amqp://guest:guest@localhost:5672/%2f/commands"
            maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance"
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.commands_sender
            'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender

@sroze sroze changed the title from [AMQP] Add AMQP adapter for Message component to [AMQP] Add AMQP adapter for Messenger component Mar 22, 2018

@nicolas-grekas

This comment has been minimized.

Member

nicolas-grekas commented Mar 22, 2018

Shouldn't this be inside the Messenger component?

use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
/**
* Symfony Message receiver to get messages from AMQP brokers using PHP's AMQP extension.

This comment has been minimized.

@javiereguiluz

javiereguiluz Mar 22, 2018

Member

Symfony Message -> Symfony Messenger ?

use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
/**
* Symfony Message sender to send messages to AMQP brokers using PHP's AMQP extension.

This comment has been minimized.

@javiereguiluz

javiereguiluz Mar 22, 2018

Member

Here too: Message -> Messenger

{
$encodedMessage = $this->messageEncoder->encode($message);
$this->connection->publish(

This comment has been minimized.

@javiereguiluz

javiereguiluz Mar 22, 2018

Member

This should probably go in one line.

*/
class Connection
{
private $amqpConnectionCredentials;

This comment has been minimized.

@javiereguiluz

javiereguiluz Mar 22, 2018

Member

Rename this to $amqpCredentials to be consistent with the other vars? $amqpChannel, $amqpExchange, etc.

This comment has been minimized.

@sroze

sroze Mar 22, 2018

Member

Or maybe $connectionCredentials ? All the variables prefixed by amqp (except this one) actually contains objects coming from PHP's AMQP library.

*/
private $amqpQueue;
public function __construct(array $amqpConnectionCredentials, string $exchangeName, string $queueName, bool $debug = false)

This comment has been minimized.

@javiereguiluz

javiereguiluz Mar 22, 2018

Member

Given how we use classes for everything in this component (and it's great!) what if we replace this PHP array to store credentials by a proper AMQPCredentials class?

This comment has been minimized.

@sroze

sroze Mar 22, 2018

Member

As we are directly passing it to the \AMQPConnection object, I believe that it's better not to enforce any constraint on what can be in these "credentials". IMHO there is no point of maintaining an object mapping all the options available for the \AMQPConnection constructor coming from PHP's AMQP library.

"license": "MIT",
"authors": [
{
"name": "Samuel Rozé",

This comment has been minimized.

@javiereguiluz

javiereguiluz Mar 22, 2018

Member

If your surname is spelled Rozé ... you need to update the PHPdoc of all classes where you spell it as Roze 😄

This comment has been minimized.

@sroze

sroze Mar 22, 2018

Member

Yeah... ASCII destroyed my surname ages ago. I gave up on the é so will revert to e 😛

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class PhpAmqpBundle extends Bundle

This comment has been minimized.

@javiereguiluz

javiereguiluz Mar 22, 2018

Member

Here we spell AMQP as Amqp ... which I think it's the official policy of Symfony code syntax. So, in all other classes, we should replace AMQP by Amqp -> e.g. AMQPEnvelope -> AmqpEnvelope

This comment has been minimized.

@sroze

sroze Mar 22, 2018

Member

Unfortunately the \AMQP* classes such as AMQPEnvelope are coming from PHP's AMQP library 🤷‍♂️

This comment has been minimized.

@theofidry

theofidry Mar 22, 2018

Contributor

You can use aliased imports ;)

@sroze

This comment has been minimized.

Member

sroze commented Mar 22, 2018

Shouldn't this be inside the Messenger component?

That's a good question. I took the outside approach but it's debatable as the "inside" approach is basically the Cache component's approach.

Cons of having them inside are:

  1. You don't want to download an AMQP client when you get the component to use it as a message bus, or when you use another adapter.
  2. It will be harder to draw the line of what's "inside" or "outside" regarding the adapters created by the community.

The pros of having them inside:

  1. As I foresee many other adapters (for example an "HTTP" adapter - ApiPlatform or not - in order to send/receive messages via HTTP), if we decide that this one should be outside the component, it means more maintenance "cost" for us to add more "official" ones (i.e. creating packages, recipes, bundles, etc...)

With years of having many adapters within the Cache component, what is your PoV? Was that a good choice?

@nicolas-grekas

This comment has been minimized.

Member

nicolas-grekas commented Mar 22, 2018

You don't want to download an AMQP client

We shouldn't care for extra files. We're talking about 500 lines here, LICENSE included.
We just need to make ext-amqp a suggestion instead.

@sroze

This comment has been minimized.

Member

sroze commented Mar 22, 2018

@nicolas-grekas you are just quoting a small part of my answer here. I understand your answer to my question about Cache Component's choice is "yes, definitely".

Therefore, let's say we go with the adapter bundled inside, how would you have a similar DX than this one that we can have with another bundle?

  1. composer req amqp
  2. Change AMQP_DSN environment variable (if needed)
  3. Configure the routing of your message in config/packages/framework.yaml
@nicolas-grekas

This comment has been minimized.

Member

nicolas-grekas commented Mar 22, 2018

@sroze we'll just ship with a default config based on e.g. PDO. Or no default config if none are available.

@sroze

This comment has been minimized.

Member

sroze commented Mar 22, 2018

we'll just ship with a default config based on e.g. PDO

What do you mean? Where would you see this configuration to be tweaked by the developer? Could you give me a bullet-list example compared to what I described above as I don't get how it could be simple for the developer?

@nicolas-grekas

This comment has been minimized.

Member

nicolas-grekas commented Mar 22, 2018

Where would you see this configuration to be tweaked by the developer

in a recipe

@theofidry

This comment has been minimized.

Contributor

theofidry commented Mar 22, 2018

@nicolas-grekas but if you do composer req amqp and you don't have the extension installed it won't be able to pick it up if those things are handled by the recipe whereas if it's outside you can put the extension as a requirement upfront.

@xabbuh

This comment has been minimized.

Member

xabbuh commented Mar 22, 2018

@theofidry There can be a pack that also defines the dependeny on the amqp extension and we can then provide a recipe for the pack.

@theofidry

This comment has been minimized.

Contributor

theofidry commented Mar 22, 2018

@sroze

This comment has been minimized.

Member

sroze commented Mar 22, 2018

But shouldn’t that bridge be also usable from outside of Symfony?

Well, there is no point of this adapter/bridge without the Messenger component anyway, isn't it? 😉

@Nyholm

This comment has been minimized.

Member

Nyholm commented Mar 22, 2018

Shouldn't this be inside the Messenger component?

I would say no.

If it is inside the Messanger component, then why shouldn't we also add a php-amqplib/php-amqplib adapter? And a php-enqueue adapter? And an adapter for Kafka, Redis etc etc etc. Where do you draw the line?

One could argue that we should just have the one, but I think that the AMQP adapter is just a implementation detail (not a core thing) of the component.

@makasim

This comment has been minimized.

Contributor

makasim commented Mar 22, 2018

There are three AMQP implementations out there php-amqplib, bunny, amqp-ext and amqp-interop that provides a set of interop interfaces. It would be better to use amqp interop and do not hard code dependency to the extension. Let developers to choose what they want to use

@nicolas-grekas

This comment has been minimized.

Member

nicolas-grekas commented Mar 22, 2018

This needs to be in the Messenger component IMHO. We don't ship components that require third-party code, because then we cannot enforce our policies for BC/FC, deprecations, etc.

Where do you draw the line?

Exactly where our policies need to be enforced so that we can provide the Symfony guarantees.

@Nyholm

This comment has been minimized.

Member

Nyholm commented Mar 22, 2018

We don't ship components that require third-party code

That is the thing. The messanger component does not need any AMQP library/extension. That is just an extra feature.


@makasim 👍

@nicolas-grekas

This comment has been minimized.

Member

nicolas-grekas commented Mar 22, 2018

@Nyholm ??? yes it needs a store, as soon as you need async.

@sroze

This comment has been minimized.

Member

sroze commented Mar 22, 2018

Where do you draw the line?

@Nyholm That question is valid regardless of having adapters inside or outside the component: which ones becomes a symfony/ package and which ones are living somewhere else... I agree with @nicolas-grekas here, it's a must-have in terms of DX for most of the users. And does not cost anything (except few KBs) for others :)

Let developers to choose what they want to use

@makasim that's exactly what we are doing by exposing all these extension points (i.e. the Sender/Receiver and Encoder/Decoder interfaces). That's exactly why I've built the Enqueue adapter. The point here is what should be shipped directly within Symfony Core.

@Nyholm

This comment has been minimized.

Member

Nyholm commented Mar 22, 2018

yes it needs a store, as soon as you need async.

Yes, but it is not obvious that you need async.

I would still like to see that the adapters is not part of the component package. But if we really feel we want to ship some: lets do php-enqueue and ext-amqp as our "adapters shipped with the component"

@dunglas

This comment has been minimized.

Member

dunglas commented Mar 22, 2018

I'm for storing adapters directly in the component.
It's easier to discover for the end user, and it's consistent with what is done for Cache and PropertyInfo (we provide an adapter for phpDocumentator's ReflectionDocBlock, and it's inside the component directly).

@Nyholm

This comment has been minimized.

Member

Nyholm commented Mar 22, 2018

But what about all external dependencies that different adapters use? We would still need ask the user to download third party code.

throw new \Exception('To use the XXX adapter you need to download XXX/library');
public function receive(): iterable
{
while (true) {
if (null === ($message = $this->connection->waitAndGet())) {

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

Useless parenthesis.

$this->debug = $debug;
}
public static function fromDsn(string $dsn, bool $debug = false)

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

: self

public static function fromDsn(string $dsn, bool $debug = false)
{
if (false === ($parsedUrl = parse_url($dsn))) {

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

Useless parenthesis

throw new \InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
}
$pathParts = explode(trim($parsedUrl['path'] ?? '', '/'), '/');

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

What about $pathParts = isset($parsedUrl['path'])) ? explode(trim($parsedUrl['path'], '/'), '/') : [];?

This comment has been minimized.

@sroze

sroze Mar 24, 2018

Member

Don't mind to change so did it 😉

}
$this->exchange()->publish($body, null, AMQP_NOPARAM, array(
'headers' => $headers,

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

Can be inlined

$message = null;
try {
$this->queue()->consume(function (\AMQPEnvelope $envelope) use (&$message) {

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

function (\AMQPEnvelope $envelope) use (&$message): bool

return false;
});
} catch (\AMQPQueueException $e) {
if (404 == $e->getCode()) {

This comment has been minimized.

@dunglas
private function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

What about a guard clause instead to reduce the complexity?

if (null !== $this->amqpChannel) {
    return $this->amqpChannel;
}

$connection = new \AMQPConnection($this->amqpConnectionCredentials);
if (false === $connection->connect()) {
    throw new \AMQPException('Could not connect to the AMQP server. Please verify the provided DSN.');
}

return $this->amqpChannel = new \AMQPChannel($connection);

I would do the same for all other methods in this file.

/**
* {@inheritdoc}
*/
public function getConfigTreeBuilder()

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

: TreeBuilder?

$container->getParameter('kernel.debug'),
)))->setFactory(array(Connection::class, 'fromDsn'));
$container->setDefinitions(array(

This comment has been minimized.

@dunglas

dunglas Mar 22, 2018

Member

I suggest to call $container->register several times instead for readability.

@joelwurtz

This comment has been minimized.

Contributor

joelwurtz commented Mar 22, 2018

The main problem with the approach of storing adapter in the component is for versioning purpose, let's imagine there is a new version of the amqp extension with a different API. When you update this adapter it will become a problem, sure you can add a conflict version, but you may have other libs depending on the old extension (and not updated).

Also people would not be able to profit for new features / bug fix unless they update their extension (although it's an implementation detail)

However having an adapter outside this component you would just need to create a new one with the new API and maintain the old one in parallelel (or create some deprecation policy to allow people upgrading to the new one)

@nicolas-grekas nicolas-grekas added this to the 4.1 milestone Mar 22, 2018

@dunglas

This comment has been minimized.

Member

dunglas commented Mar 22, 2018

Versionning isn’t an issue. We can create bridges for both versions of the dep (we did that for Reflection DocBlock in Prophecy for instance).

Regarding extra packages, we’ll add adapters dependencies in the suggest section. And we can always create metapackages such as message-amp for DX of suggesting isn’t enough.

@sroze

This comment has been minimized.

Member

sroze commented Mar 22, 2018

I agree that the best option is actually to package the AMQP adapter within the Messenger component. This will simplify the maintenance and ease the discoverability of the adapter. In order to integrate it properly with Symfony it, therefore, requires works on the messenger section of the FrameworkBundle, like the following:

framework:
    messenger:
        adapters:
            default: "amqp://guest:guest@localhost:5672/%2f/messages"

@fabpot to prevent conflicts, it seems like the best idea is to wait for the Messenger PR to be merged within master :)

@sroze

This comment has been minimized.

Member

sroze commented Mar 30, 2018

Yep, very good idea. I guess <1.9.3,>=2.0.0 will prevent us from the BCs of the extension 2.0 as well.

@Tobion

This comment has been minimized.

Member

Tobion commented Apr 1, 2018

Could you give an example how to set options for AMQP like x-dead-letter-exchange and x-message-ttl? Those are often needed. So there should be an easy way to set those.

@sroze

This comment has been minimized.

Member

sroze commented Apr 2, 2018

@chalasr added the conflict.
@Tobion good point. Added it in 587c741 👍

@lyrixx

I left few comments

env:
global:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

I would use messages as vhost (ie: removing the leading /). it's more common.

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

Though, in RabbitMq's defaults, the default virtual host is named /. That's why I used it and I believe as it matches the default, this is the one to keep.

@@ -1459,6 +1460,23 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
} else {
$container->removeDefinition('messenger.middleware.doctrine_transaction');
}
foreach ($config['adapters'] as $name => $adapter) {
$container->setDefinition($adapterId = 'messenger.'.$name.'_adapter', (new Definition(MessengerAdapterInterface::class, array(

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

Why don't you use "messenger.$name_adapter" ?

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

It looks more explicit this way. But happy to change, I don't mind much.

$container->setDefinition($adapterId = 'messenger.'.$name.'_adapter', (new Definition(MessengerAdapterInterface::class, array(
$adapter['dsn'],
$adapter['options'],
)))->setFactory(array(new Reference('messenger.adapter_factory'), 'create'))->setPublic(true));

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

Does it really need to be public ?

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

That's going to be the adapter's object: this can be used as an extension point. That was the idea 🤔

This comment has been minimized.

@lyrixx

lyrixx Apr 3, 2018

Member

There is no correlation between an extension point and the fact it's private or public.

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

It's trickier for people to use the service if it isn't public 😄

*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface AdapterFactoryInterface

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

Why do we need this interface and all implementations ? It adds another layer to understand. More over there is a strict matching between the factory and the the implementation.
You could move the supports in the the corresponding implementation (AmqpAdapter::support).
Then, the ChainAdapterFactory could be a ChainAdapter that own a lazy loaded collection of Adapter (with a service locator).
It will remove lot of code. If someone want a custom adapter, it's still possible.

I mean, I don't think we need to abstract the service creation for the end user, everything is already available in symfony for that.

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

The thing is: for the developer experience, I believe that we need to have a very common way of creating the adapters: via a simple DSN, and that's the point of this (thin) adapter factory. That way, it's super easy for other developers to create new adapters.

If we move the supports down to the AdapterInterface, then we need to move the $dsn argument to both the receiver and sender methods and sort of force a double-parsing of it on the developer's perspective. I don't mean that it's impossible but less elegant I'd say.

This comment has been minimized.

@lyrixx

lyrixx Apr 3, 2018

Member

The parsing could (should ?) be done in the FrameworkExtension ?

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

If it's only for "native" adapters, yes. But this can't be done for adapters coming from 3rd party libraries, and that's the entire point: we want 3rd party component to plug-in and have people using a simple DSN such as enqueue+sqs://sqs-address for example. While this wouldn't be in the core obviously.

This comment has been minimized.

@lyrixx

lyrixx Apr 4, 2018

Member

To me, it is still another layer of indirection. But I'm not strongly against this. Let's see what other think.

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

You are right. It's "an extension point" we don't need now as just the adapter factory is enough. Removed in d6c046a.

public function receiver(): ReceiverInterface
{
return new AmqpReceiver($this->decoder, $this->connection);

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

Why do you return a new instance each time it's called ?
More over, to me this instanciation should be executed outside of this class, in the DIC for example.

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

This is actually registered as a service, and therefore executed once by the DIC :)

This comment has been minimized.

@lyrixx

lyrixx Apr 3, 2018

Member

The AmqpAdapter is registered in the DIC, not the AmqpReceiver isn't ?

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

It is: check this out. The adapter service is the factory of the receiver and sender services.

This comment has been minimized.

@lyrixx

lyrixx Apr 4, 2018

Member

Sorry, I guess I missed something :( The AmqpAdapter, the sender and the receiver are indeed registred in the DIC. This this very AmqpAdapter does not use the the sender or the receiver that are in the DIC.

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

This very AmqpAdapter is used to create the sender and receiver services. Therefore $container->get('messenger.[name]_sender') will get this very AmqpSender object.

Though, you are right that $this->get('messenger.[name]_adapter')->receiver() will return a new instance, and this might be a cause of issues. I guess I'll add a if (null === $this->receiver) ... in the adapter so it won't re-create it.

throw $e;
} catch (\Throwable $e) {
$this->connection->nack($message);

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

I would requeue the message, because if you don't, the message will be lost

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

Updated 👍

private $connectionCredentials;
private $exchangeConfiguration;
private $queueConfiguration;
private $debug;

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

Can you sort this list of variables with the same order of the constructor parameters?

$message = null;
try {
$this->queue()->consume(function (\AMQPEnvelope $envelope) use (&$message): bool {

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

I don't really like this design either.

  1. It can work with only one queue at a time
  2. Can not react (nicely) with signals
  3. The PHP process can not does something else between 2 AMQP messages
  4. It's blocking
return $this->queue()->reject($message->getDeliveryTag());
}
public function nack(\AMQPEnvelope $message)

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

You need to expose the \AmqpQueue::nack $amqpParam

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

Good point, done 👍

$this->exchange()->declareExchange();
$this->queue()->declareQueue();
$this->queue()->bind($this->exchange()->getName());

This comment has been minimized.

@lyrixx

lyrixx Apr 2, 2018

Member

This works because the default exchange type is Fan Out. If one use Direct is will not work anymore. You may add a routing key option to configure it

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

Good point, added the routing key as a possible option on the queue. queue[routing_key]=... in the DSN 👍

@sroze

This comment has been minimized.

Member

sroze commented Apr 3, 2018

Thank you very much @lyrixx for the review. I've updated a few things and added comments to the rest. Can you check them out? :)

@lyrixx

This comment has been minimized.

Member

lyrixx commented Apr 3, 2018

@sroze Nice iteration. Thanks.

.travis.yml Outdated
@@ -142,6 +150,7 @@ before_install:
tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI
tfold ext.libsodium tpecl libsodium sodium.so $INI
tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI
tfold ext.amqp tpecl amqp-1.9.0 amqp.so $INI

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

We need 1.9.3 if we conflict with earlier than that :)

env:
global:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

Though, in RabbitMq's defaults, the default virtual host is named /. That's why I used it and I believe as it matches the default, this is the one to keep.

* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\PhpAmqp;

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

Fair enough, this could be confusing, but Amqp is as vague. What about AmqpExt?

*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class Connection

This comment has been minimized.

@soyuka

soyuka Apr 3, 2018

Contributor

What about adding an interface for this as well? I mean every implementation will have: ack, reject, nack, fromDsn methods so I think it may be a good idea.

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

every implementation will have: ack, reject, nack, fromDsn methods

We don't know yet. Tbh, I'm convinced the other way around: most won't. But either way, I suggest we continue to improve this later (i.e. potentially adding an interface) when we see the real use case. (also, this Connection object is only interesting for the Amqp adapter).

$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
public static function fromDsn(string $dsn, array $options = array(), bool $debug = false, AmqpFactory $amqpFactory = null): self

This comment has been minimized.

@soyuka

soyuka Apr 3, 2018

Contributor

What about a Trait or some reusable method for the parsing stuff?

This comment has been minimized.

@sroze

sroze Apr 3, 2018

Member

I'd use the same point used in the previous comment: we don't know yet so let's not expose too much. Or said another way: what is public (i.e. to be able to be shared/re-use) is supersensible to BC-break and adds up to our BC responsibility. Adding such exposure for an hypothetic use case is too risky IMHO.

@sroze

This comment has been minimized.

Member

sroze commented Apr 3, 2018

@lyrixx @soyuka @makasim thank you so much for your tests and reviews that pushes that adapter and the transport's design to their limits. This (hopefully last) change should resolve the last blockers.

Following the discovery of this yield/try/catch weirdness (which isn't a bug) I decided to question this choice that I loved of using an iterable as a result of the ReceiverInterface. The only way to manage errors is via sending the exceptions back to the generator (via $generator->throw($exception)): this adds a lot of complexity (i.e. if instanceof Generator checks in the catch of ReceiverInterface decorators) and allows an easy way to break this error management by forgetting to forward the error down or by simply not using generators anymore. Therefore, I changed the interface to take a message handler instead of returning that generator. It looks good ($receiver->receive($handler) is literally correct) and prevents a weird management of exceptions.

Last but not least, in order to prevent the issues mentioned by Max and tackle in Greg's PR, I moved from using the consume method to the get method. We now have to manually handle the signals but first, we can be sure it's properly handled (see the test) and it removes this very specific constraint on the AMQP library version.

$this->receiver->receive(function($message) {
if (null === $message) {
return;
}

This comment has been minimized.

@soyuka

soyuka Apr 4, 2018

Contributor

why this test?

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

Because the receiver's receive callback can receive a null message in case of timeout. This is to tackle the point exposed by @lyrixx: that way, you can add extra capabilities (like sending pings to the monitoring, etc...) at timely intervals even if no messages are handled. You basically put a read_timeout on the connection and decorate the receiver to do whatever you want (with or without messages coming in).

@soyuka

soyuka approved these changes Apr 4, 2018

@Nyholm

Nyholm approved these changes Apr 4, 2018

I've reviewed the changes and I like it. I've not tested it and played around with the implementation yet.

<xsd:sequence>
<xsd:element name="option" type="messenger_adapter_option" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>

This comment has been minimized.

@Nyholm

Nyholm Apr 4, 2018

Member

Remove blank line

use Symfony\Component\Messenger\Transport\SenderInterface;
/**
* @author Samuel Roze <samuel.roze@gmail.com>

This comment has been minimized.

@Nyholm

Nyholm Apr 4, 2018

Member

I would really like a line or two on this interface to explain what it does.

}
try {
if (false !== $message = $this->queue()->get()) {

This comment has been minimized.

@makasim

makasim Apr 4, 2018

Contributor
  • The get method is not adviced to be used
    • The performance is worse than basic.consume method.
    • It is sync
    • It does not support QoS (prefetch count, prefetch size),
    • It is not visible in RabbitMQ's UI.
    • It makes the bigger load on RabbitMQ server as well as the process itself (it spends CPU resources to loop)
    • You need a usleep in the loop, a bigger sleeptime value reduces the use of CPU but degrades the performance.

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

Obviously, there are pros and cons to both methods. Your previous reviews suggested we shouldn't use the consume method as it wasn't reliable and had some "strange behaviour". Let's try to get as close to the fact as possible. Here is a comparison table of the two approaches:

consume method get method
Pros
  • Is blocking (low CPU impact for the loop)
  • Easy signal handling capabilities
  • Will be relatively easy to support geting from multiple queues
  • Allow us to adds ticks while waiting for the messages
Cons
  • Does not handle signals well
  • Can't easily handle multiple queues (as blocking)
  • Has "strange behaviour" for ext-amqp < 1.9.3 (which is the last one)
  • Messages might be mixed up between queues
  • Higher CPU footprint because of the loop
  • It not visible in RabbitMq's UI
  • Does not support QoS

This comment has been minimized.

@makasim

makasim Apr 4, 2018

Contributor

There is no any strangeness in basic.conume itself, it comes from the way it was used and there is a solution for it a buffer.

I am all for basic consume properly applied .

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

That said, I suggest we go for the get method option which seems to be the one that will provide the greater stability. Later, if we some use-cases/issues and believe this was not the correct choice, we can add an option to use the consume method instead easily. That way we move forward and stay open to changes :)

@@ -970,12 +970,21 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
->arrayNode('messenger')
->info('Messenger configuration')
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->validate()
->ifTrue(function ($v) { return isset($v['adapter']) && isset($v['adapters']); })
->thenInvalid('"adapter" and "adapters" cannot be used together.')

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

Did you removed an adapter key at some point? This seems useless now.

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

DX-wise, the adapter key would have been interesting to me though, as projects with multiple adapters might not be so common. So the Messenger recipe would have been:

parameters:
    env(MESSENGER_ADAPTER_DSN): null

framework:
    messenger:
        # Provide the "MESSENGER_ADAPTER_DSN" env var value to enable a "default" adapter.
        adapter: "%env(MESSENGER_ADAPTER_DSN)%"

and just have to uncomment the var in .env. (We should allow null as framework.adapter value and consider there is no adapter).

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

True, I removed it. It appears that DX-wise, it does not change anything actually thanks to the recipe (i.e. framework.messenger.adapters.default configured by default - maybe commented). And if we put framework.messenger.adapter in the recipe, it's less obvious they can use multiple adapters.

customised:
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
options:
queue_name: Queue

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

Missing newline

This comment has been minimized.

@sroze

sroze Apr 5, 2018

Member

Added 👍

@@ -543,6 +544,33 @@ public function testMessengerValidationDisabled()
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
}
public function testMessengerAdapter()
{
$container = $this->createContainerFromFile('messenger_adapter');

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

So, the messenger key can be enabled even if the component is not part of the composer.json dev reqs?
I suspect the FrameworkExtension is missing something like:

if (!class_exists(MessageBusInterface::class)) {
    throw new LogicException('Messenger cannot be enabled as the Messenger component is not installed.');
}

😄

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

Good catch, will handle that in another PR 😉

This comment has been minimized.

@sroze

sroze Apr 8, 2018

Member

You fixed it in #26816, thank you :)

$senderArguments = $container->getDefinition('messenger.customised_sender')->getArguments();
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory);
$this->assertEquals(2, count($senderArguments));

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

assertCount ? (same for other occurrences)

@@ -1470,6 +1470,24 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
} else {
$container->removeDefinition('messenger.middleware.validator');
}
foreach ($config['adapters'] as $name => $adapter) {
$container->setDefinition('messenger.'.$name.'_sender', (new Definition(SenderInterface::class))->setFactory(array(

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

Usually, it's 'messenger.sender.'.$name rather than 'messenger.'.$name.'_sender'. Same for receivers.

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

Happy to change but... What do you mean by "usually"? :)

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

I mean that's AFAIK what most service factories will choose as format for their ids (see security listeners concrete instances or user_checkers named after firewalls, workflows, Monolog handlers, ...) but also what we do anyway to gather similar services under an identifiable "namespaces".

This comment has been minimized.

@sroze

sroze Apr 5, 2018

Member

Fair enough. Changed.

public function createSender(string $dsn, array $options): SenderInterface;
public function supports(string $dsn, array $options): bool;

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

Just asking: it's admitted any adapter factory will provide both sender and receiver factories?
What if I only need a sender? I'd rather create the sender service myself and tag it?

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

Tbh, you can skip the AdapterFactoryInterface if you don't want the developers to use the DSN to create your senders/receivers. Registering and tagging the sender/receiver still works. Also, you could create a NullSender or NullAdapter.

* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Adapter\PhpAmqp;

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

👍 for AmqpExt

throw $e;
}
if (function_exists('pcntl_signal_dispatch')) {

This comment has been minimized.

@ogizanagi

ogizanagi Apr 4, 2018

Member

Not in a finally? Not needed?

This comment has been minimized.

@sroze

sroze Apr 4, 2018

Member

That's a good shot, this could be in the finally. I don't know if it's really needed but doesn't cost :)

@davidbarratt

This comment has been minimized.

Contributor

davidbarratt commented Apr 8, 2018

Any chance on this getting merged soon? Thanks! :)

public function createQueue(\AMQPChannel $channel): \AMQPQueue
{
return new \AMQPQueue($channel);

This comment has been minimized.

@davidbarratt

davidbarratt Apr 8, 2018

Contributor

I think these \AMQP classes are only in the amqp pecl extension? if so, it should be a requirement in composer.json
https://getcomposer.org/doc/04-schema.md#package-links
something like

"ext-amqp": "^1.0.0"

unless there's an alternative?

This comment has been minimized.

@sroze

sroze Apr 8, 2018

Member

That's correct. We can't add the requirement in this composer.json file as it's not a requirement for the entire component, but for its AMQP adapter only. Though you are right in pointing that we don't handle properly the case when the extension is not installed: we can add a nice exception message when trying to create the adapter while the extension does not exist.

This comment has been minimized.

@davidbarratt

davidbarratt Apr 8, 2018

Contributor

good to know! I'm not going to hold up the train for that, but probably a good idea to add at some point.

@dunglas

dunglas approved these changes Apr 8, 2018

@sroze sroze merged commit 798c230 into symfony:master Apr 12, 2018

1 of 3 checks passed

continuous-integration/appveyor/pr Waiting for AppVeyor build to complete
Details
continuous-integration/travis-ci/pr The Travis CI build is in progress
Details
fabbot.io Your code looks good.
Details

sroze added a commit that referenced this pull request Apr 12, 2018

feature #26632 [Messenger] Add AMQP adapter (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes #26632).

Discussion
----------

[Messenger] Add AMQP adapter

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | ø
| License       | MIT

- [x] Depends on the Messenger component #24411
- [x] Add tests once we are all happy about the structure

---

In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used.

Configuring the adapter is as simple as the following configuration:
```yaml
# config/packages/messenger_adapters.yaml
framework:
    messenger:
        adapter: "%env(MESSENGER_DSN)%"
```

With the given `.env` for example:
```
MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages
```

Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.default_sender
```

---

Additionally, multiple adapters can be created and messages routed to these ones.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        adapters:
            commands: "amqp://guest:guest@localhost:5672/%2f/commands"
            maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance"
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.commands_sender
            'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender
```

Commits
-------

798c230 [Messenger] Add AMQP adapter

@fabpot fabpot referenced this pull request May 7, 2018

Merged

Release v4.1.0-BETA1 #27181

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment