Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Add a middleware that wraps all handlers in one Doctrine transaction. #26647

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,55 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Bridge\Doctrine\Messenger;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\MiddlewareInterface;

/**
* Wraps all handlers in a single doctrine transaction.
*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class DoctrineTransactionMiddleware implements MiddlewareInterface
{
private $managerRegistry;
private $entityManagerName;

public function __construct(ManagerRegistry $managerRegistry, ?string $entityManagerName)
{
$this->managerRegistry = $managerRegistry;
$this->entityManagerName = $entityManagerName;
}

public function handle($message, callable $next)
{
$entityManager = $this->managerRegistry->getManager($this->entityManagerName);

if (!$entityManager instanceof EntityManagerInterface) {
throw new \InvalidArgumentException(sprintf('The ObjectManager with name "%s" must be an instance of EntityManagerInterface', $this->entityManagerName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing dot at the end of the exception message.

}

$entityManager->getConnection()->beginTransaction();
try {
$result = $next($message);
$entityManager->flush();
$entityManager->getConnection()->commit();
} catch (\Throwable $exception) {
$entityManager->getConnection()->rollBack();

throw $exception;
}

return $result;
}
}
Expand Up @@ -969,13 +969,27 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
->children()
->arrayNode('routing')
->useAttributeAsKey('message_class')
->beforeNormalization()
->always()
->then(function ($config) {
$newConfig = array();
foreach ($config as $k => $v) {
if (!is_int($k)) {
$newConfig[$k] = array('senders' => is_array($v) ? array_values($v) : array($v));
} else {
$newConfig[$v['message-class']]['senders'] = array_map(
function ($a) {
return is_string($a) ? $a : $a['service'];
},
array_values($v['sender'])
);
}
}

return $newConfig;
})
->end()
->prototype('array')
->beforeNormalization()
->ifString()
->then(function ($v) {
return array('senders' => array($v));
})
->end()
->children()
->arrayNode('senders')
->requiresAtLeastOneElement()
Expand All @@ -984,6 +998,16 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
->end()
->end()
->end()
->arrayNode('middlewares')
->addDefaultsIfNotSet()
->children()
->arrayNode('doctrine_transaction')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be configured via Framework, but via DoctrineBundle. We don't have any other Doctrine config here AFAIR.

Copy link
Contributor

@sroze sroze Mar 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabpot are you happy about the middleware being in the bridge, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabpot actually, the cache also relies on Doctrine in FrameworkBundle. Note that this doesn't configure Doctrine either it justs allows enables the middleware. I'll submit a PR to remove it from here but I believe it might makes sense (follows the same pattern as cache configuration) to be here 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#26684 is in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this makes sense as is. Moving it to doctrine bundle would mean configuring messenger middlewares in two different places, I can't think of a good final result.
We enable logging features for some components via framework config in the same way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @chalasr , it's only a feature flag. It does no harm having this in the fwb whereas having to configure middlewares in 2 different config keys is really bad, DX wise. WDYT?

->canBeEnabled()
->children()
->scalarNode('entity_manager_name')->info('The name of the entity manager to use')->defaultNull()->end()
->end()
->end()
->end()
->end()
->end()
->end()
Expand Down
Expand Up @@ -13,6 +13,7 @@

use Doctrine\Common\Annotations\Reader;
use Doctrine\Common\Annotations\AnnotationRegistry;
use Symfony\Bridge\Doctrine\Messenger\DoctrineTransactionMiddleware;
use Symfony\Bridge\Monolog\Processor\DebugProcessor;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
Expand Down Expand Up @@ -1445,6 +1446,15 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);

if ($config['middlewares']['doctrine_transaction']['enabled']) {
if (!class_exists(DoctrineTransactionMiddleware::class)) {
throw new LogicException('The Doctrine transaction middleware is only available when the doctrine bridge is installed. Try running "composer require symfony/doctrine-bridge".');
}
$container->getDefinition('messenger.middleware.doctrine_transaction')->replaceArgument(1, $config['middlewares']['doctrine_transaction']['entity_manager_name']);
} else {
$container->removeDefinition('messenger.middleware.doctrine_transaction');
}
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
Expand Up @@ -25,6 +25,13 @@
<tag name="message_bus_middleware" priority="-10" />
</service>

<service id="messenger.middleware.doctrine_transaction" class="Symfony\Bridge\Doctrine\Messenger\DoctrineTransactionMiddleware">
<argument type="service" id="doctrine" />
<argument /> <!-- Entity manager name -->

<tag name="message_bus_middleware" priority="9" />
</service>

<!-- Asynchronous -->
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
<argument type="service" id="messenger.sender_locator" />
Expand Down
Expand Up @@ -30,6 +30,7 @@
<xsd:element name="cache" type="cache" minOccurs="0" maxOccurs="1" />
<xsd:element name="workflow" type="workflow" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="lock" type="lock" minOccurs="0" maxOccurs="1" />
<xsd:element name="messenger" type="messenger" minOccurs="0" maxOccurs="1" />
</xsd:choice>

<xsd:attribute name="http-method-override" type="xsd:boolean" />
Expand Down Expand Up @@ -342,4 +343,33 @@
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>

<xsd:complexType name="messenger">
<xsd:sequence>
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
</xsd:complexType>

<xsd:complexType name="messenger_routing">
<xsd:choice minOccurs="0" maxOccurs="unbounded">
<xsd:element name="sender" type="messenger_routing_sender" />
</xsd:choice>
<xsd:attribute name="message-class" type="xsd:string" use="required"/>
</xsd:complexType>

<xsd:complexType name="messenger_routing_sender">
<xsd:attribute name="service" type="xsd:string" use="required"/>
</xsd:complexType>

<xsd:complexType name="messenger_middleware">
<xsd:sequence>
<xsd:element name="doctrine-transaction" type="messenger_doctrine_transaction" minOccurs="0" maxOccurs="1" />
</xsd:sequence>
</xsd:complexType>

<xsd:complexType name="messenger_doctrine_transaction">
<xsd:attribute name="entity-manager-name" type="xsd:string" />
<xsd:attribute name="enabled" type="xsd:boolean" />
</xsd:complexType>
</xsd:schema>
Expand Up @@ -253,6 +253,12 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
'messenger' => array(
'enabled' => !class_exists(FullStack::class) && class_exists(MessageBusInterface::class),
'routing' => array(),
'middlewares' => array(
'doctrine_transaction' => array(
'enabled' => false,
'entity_manager_name' => null,
),
),
),
);
}
Expand Down
@@ -0,0 +1,10 @@
<?php

$container->loadFromExtension('framework', array(
'messenger' => array(
'routing' => array(
'App\Bar' => array('sender.bar', 'sender.biz'),
'App\Foo' => 'sender.foo',
),
),
));
@@ -0,0 +1,11 @@
<?php

$container->loadFromExtension('framework', array(
'messenger' => array(
'middlewares' => array(
'doctrine_transaction' => array(
'entity_manager_name' => 'foobar',
),
),
),
));
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:routing message-class="App\Bar">
<framework:sender service="sender.bar" />
<framework:sender service="sender.biz" />
</framework:routing>
<framework:routing message-class="App\Foo">
<framework:sender service="sender.foo" />
</framework:routing>
</framework:messenger>
</framework:config>
</container>
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:middlewares>
<framework:doctrine-transaction entity-manager-name="foobar" />
</framework:middlewares>
</framework:messenger>
</framework:config>
</container>
@@ -0,0 +1,5 @@
framework:
messenger:
routing:
'App\Bar': ['sender.bar', 'sender.biz']
'App\Foo': 'sender.foo'
@@ -0,0 +1,5 @@
framework:
messenger:
middlewares:
doctrine_transaction:
entity_manager_name: 'foobar'
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Bundle\FrameworkBundle\Tests\DependencyInjection;

use Doctrine\Common\Annotations\Annotation;
use Symfony\Bridge\Doctrine\ContainerAwareEventManager;
use Symfony\Bundle\FullStack;
use Symfony\Bundle\FrameworkBundle\Tests\TestCase;
use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass;
Expand Down Expand Up @@ -489,6 +490,24 @@ public function testWebLink()
$this->assertTrue($container->hasDefinition('web_link.add_link_header_listener'));
}

public function testMessenger()
{
$container = $this->createContainerFromFile('messenger');
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
}

public function testMessengerDoctrine()
{
if (!class_exists(ContainerAwareEventManager::class)) {
self::markTestSkipped('Skipping tests since Doctrine bridge is not installed');
}

$container = $this->createContainerFromFile('messenger_doctrine');
$this->assertTrue($container->hasDefinition('messenger.middleware.doctrine_transaction'));
$def = $container->getDefinition('messenger.middleware.doctrine_transaction');
$this->assertEquals('foobar', $def->getArgument(1));
}

public function testTranslator()
{
$container = $this->createContainerFromFile('full');
Expand Down