Skip to content

Commit

Permalink
bug #43133 [Messenger] Move container resetting after receiver acknow…
Browse files Browse the repository at this point in the history
…ledging (upyx)

This PR was merged into the 5.4 branch.

Discussion
----------

[Messenger] Move container resetting after receiver acknowledging

| Q             | A
| ------------- | ---
| Branch?       | 5.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       | Fix #43114
| License       | MIT
| Doc PR        | nope

It fixes the #43114 issue and similar potential bugs. The `ResetServicesListener` lean on events sequence: the `AbstractWorkerMessageEvent` to save the actual receiver name, then the `WorkerRunningEvent` to check that name and reset the container. It may look fragile, so let's discuss it in the issue.

Commits
-------

fc85aef [Messenger] Move container resetting after receiver acknowledging (in command)
  • Loading branch information
fabpot committed Sep 27, 2021
2 parents 28567f3 + fc85aef commit 8653b33
Show file tree
Hide file tree
Showing 52 changed files with 269 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1333,10 +1333,6 @@ function ($a) {
->fixXmlConfig('option')
->children()
->scalarNode('dsn')->end()
->booleanNode('reset_on_message')
->defaultFalse()
->info('Reset container services after each message. Turn it on when the transport is async and run in a worker.')
->end()
->scalarNode('serializer')->defaultNull()->info('Service id of a custom serializer to use.')->end()
->arrayNode('options')
->normalizeKeys(false)
Expand Down Expand Up @@ -1374,6 +1370,10 @@ function ($a) {
->defaultNull()
->info('Transport name to send failed messages to (after all retries have failed).')
->end()
->booleanNode('reset_on_message')
->defaultNull()
->info('Reset container services after each message.')
->end()
->scalarNode('default_bus')->defaultNull()->end()
->arrayNode('buses')
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1977,7 +1977,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$senderAliases = [];
$transportRetryReferences = [];
$transportNamesForResetServices = [];
foreach ($config['transports'] as $name => $transport) {
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
$transportDefinition = (new Definition(TransportInterface::class))
Expand Down Expand Up @@ -2006,18 +2005,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$transportRetryReferences[$name] = new Reference($retryServiceId);
}
if ($transport['reset_on_message']) {
$transportNamesForResetServices[] = $name;
}
}

if ($transportNamesForResetServices) {
$container
->getDefinition('messenger.listener.reset_services')
->replaceArgument(1, $transportNamesForResetServices)
;
} else {
$container->removeDefinition('messenger.listener.reset_services');
}

$senderReferences = [];
Expand Down Expand Up @@ -2089,6 +2076,17 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
}

if (false === $config['reset_on_message']) {
throw new LogicException('The "framework.messenger.reset_on_message" configuration option can be set to "true" only. To prevent services resetting after each message you can set the "--no-reset" option in "messenger:consume" command.');
}

if (null === $config['reset_on_message']) {
trigger_deprecation('symfony/framework-bundle', '5.4', 'Not setting the "framework.messenger.reset_on_message" configuration option is deprecated, it will default to "true" in version 6.0.');

$container->getDefinition('console.command.messenger_consume_messages')->replaceArgument(5, null);
$container->removeDefinition('messenger.listener.reset_services');
}
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
service('event_dispatcher'),
service('logger')->nullOnInvalid(),
[], // Receiver names
service('messenger.listener.reset_services')->nullOnInvalid(),
])
->tag('console.command')
->tag('monolog.logger', ['channel' => 'messenger'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@
service('services_resetter'),
abstract_arg('receivers names'),
])
->tag('kernel.event_subscriber')

->set('messenger.routable_message_bus', RoutableMessageBus::class)
->args([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@
<xsd:attribute name="default-bus" type="xsd:string" />
<xsd:attribute name="enabled" type="xsd:boolean" />
<xsd:attribute name="failure-transport" type="xsd:string" />
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
</xsd:complexType>

<xsd:complexType name="messenger_serializer">
Expand Down Expand Up @@ -505,7 +506,6 @@
<xsd:attribute name="serializer" type="xsd:string" />
<xsd:attribute name="dsn" type="xsd:string" />
<xsd:attribute name="failure-transport" type="xsd:string" />
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
</xsd:complexType>

<xsd:complexType name="messenger_retry_strategy">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
],
'default_bus' => null,
'buses' => ['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]],
'reset_on_message' => null,
],
'disallow_search_engine_index' => true,
'http_client' => [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

$container->loadFromExtension('framework', [
'messenger' => [
'reset_on_message' => true,
'routing' => [
FooMessage::class => ['sender.bar', 'sender.biz'],
BarMessage::class => 'sender.foo',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

$container->loadFromExtension('framework', [
'messenger' => [
'reset_on_message' => true,
'buses' => [
'command_bus' => [
'middleware' => [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

$container->loadFromExtension('framework', [
'messenger' => [
'reset_on_message' => true,
'default_bus' => 'messenger.bus.commands',
'buses' => [
'messenger.bus.commands' => null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

$container->loadFromExtension('framework', [
'messenger' => [
'reset_on_message' => true,
'transports' => [
'transport_1' => [
'dsn' => 'null://',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
$container->loadFromExtension('framework', [
'messenger' => [
'failure_transport' => 'failure_transport_global',
'reset_on_message' => true,
'transports' => [
'transport_1' => [
'dsn' => 'null://',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
$container->loadFromExtension('framework', [
'serializer' => true,
'messenger' => [
'reset_on_message' => true,
'serializer' => [
'default_serializer' => 'messenger.transport.symfony_serializer',
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
$container->loadFromExtension('framework', [
'serializer' => true,
'messenger' => [
'reset_on_message' => true,
'serializer' => [
'default_serializer' => 'messenger.transport.symfony_serializer',
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

$container->loadFromExtension('framework', [
'messenger' => [
'reset_on_message' => true,
'routing' => [
'Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage' => ['amqp'],
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
$container->loadFromExtension('framework', [
'serializer' => true,
'messenger' => [
'reset_on_message' => true,
'serializer' => [
'default_serializer' => 'messenger.transport.symfony_serializer',
'symfony_serializer' => [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
'serializer' => true,
'messenger' => [
'failure_transport' => 'failed',
'reset_on_message' => true,
'serializer' => [
'default_serializer' => 'messenger.transport.symfony_serializer',
],
'transports' => [
'default' => 'amqp://localhost/%2f/messages',
'customised' => [
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
'reset_on_message' => true,
'options' => ['queue' => ['name' => 'Queue']],
'serializer' => 'messenger.transport.native_php_serializer',
'retry_strategy' => [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\BarMessage;
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage;

$container->loadFromExtension('framework', [
'messenger' => [
'reset_on_message' => false,
'routing' => [
FooMessage::class => ['sender.bar', 'sender.biz'],
BarMessage::class => 'sender.foo',
],
'transports' => [
'sender.biz' => 'null://',
'sender.bar' => 'null://',
'sender.foo' => 'null://',
],
],
]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\BarMessage;
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage;

$container->loadFromExtension('framework', [
'messenger' => [
'routing' => [
FooMessage::class => ['sender.bar', 'sender.biz'],
BarMessage::class => 'sender.foo',
],
'transports' => [
'sender.biz' => 'null://',
'sender.bar' => 'null://',
'sender.foo' => 'null://',
],
],
]);
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

$container->loadFromExtension('framework', [
'messenger' => [
'enabled' => true
'enabled' => true,
'reset_on_message' => true,
],
'mailer' => [
'dsn' => 'smtp://example.com',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
],
'messenger' => [
'enabled' => true,
'reset_on_message' => true,
],
'notifier' => [
'enabled' => true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:messenger reset-on-message="true">
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage">
<framework:sender service="sender.bar" />
<framework:sender service="sender.biz" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger default-bus="messenger.bus.commands">
<framework:messenger default-bus="messenger.bus.commands" reset-on-message="true">
<framework:bus name="messenger.bus.commands" />
<framework:bus name="messenger.bus.events">
<framework:middleware id="with_factory">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:messenger reset-on-message="true">
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger failure-transport="failure_transport_global">
<framework:messenger failure-transport="failure_transport_global" reset-on-message="true">
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<framework:config>
<framework:serializer enabled="true" />
<framework:messenger>
<framework:messenger reset-on-message="true">
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage">
<framework:sender service="amqp" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<framework:config>
<framework:serializer enabled="true" />
<framework:messenger>
<framework:messenger reset-on-message="true">
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage">
<framework:sender service="invalid" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:messenger reset-on-message="true">
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage">
<framework:sender service="amqp" />
</framework:routing>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<framework:config>
<framework:serializer enabled="true" />
<framework:messenger>
<framework:messenger reset-on-message="true">
<framework:serializer default-serializer="messenger.transport.symfony_serializer">
<framework:symfony-serializer format="csv">
<framework:context>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

<framework:config>
<framework:serializer enabled="true" />
<framework:messenger failure-transport="failed">
<framework:messenger failure-transport="failed" reset-on-message="true">
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" reset-on-message="true">
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
<framework:options>
<framework:queue>
<framework:name>Queue</framework:name>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger reset-on-message="false">
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage">
<framework:sender service="sender.bar" />
<framework:sender service="sender.biz" />
</framework:routing>
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\BarMessage">
<framework:sender service="sender.foo" />
</framework:routing>
<framework:transport name="sender.bar" dsn="null://" />
<framework:transport name="sender.biz" dsn="null://" />
<framework:transport name="sender.foo" dsn="null://" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage">
<framework:sender service="sender.bar" />
<framework:sender service="sender.biz" />
</framework:routing>
<framework:routing message-class="Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\BarMessage">
<framework:sender service="sender.foo" />
</framework:routing>
<framework:transport name="sender.bar" dsn="null://" />
<framework:transport name="sender.biz" dsn="null://" />
<framework:transport name="sender.foo" dsn="null://" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger enabled="true" />
<framework:messenger enabled="true" reset-on-message="true" />
<framework:mailer dsn="smtp://example.com" />
<framework:notifier enabled="true" notification-on-failed-messages="true">
<framework:chatter-transport name="slack">null</framework:chatter-transport>
Expand Down
Loading

0 comments on commit 8653b33

Please sign in to comment.