Skip to content

Commit 09b4afa

Browse files
committed
Async event dispatcher
1 parent 18c5cc4 commit 09b4afa

File tree

12 files changed

+350
-0
lines changed

12 files changed

+350
-0
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\DependencyInjection\Compiler;
4+
5+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\EventDispatcher\DependencyInjection\RegisterListenersPass;
8+
9+
class AsyncEventsPass implements CompilerPassInterface
10+
{
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function process(ContainerBuilder $container)
15+
{
16+
if (false == $container->hasDefinition('enqueue.events.async_listener')) {
17+
return;
18+
}
19+
20+
$registeredToEvent = [];
21+
foreach ($container->findTaggedServiceIds('kernel.event_listener') as $serviceId => $tagAttributes) {
22+
foreach ($tagAttributes as $tagAttribute) {
23+
if (false == isset($tagAttribute['async'])) {
24+
continue;
25+
}
26+
27+
$service = $container->getDefinition($serviceId);
28+
29+
$service->clearTag('kernel.event_listener');
30+
$service->addTag('enqueue.async_event_listener', $tagAttribute);
31+
32+
if (false == isset($registeredToEvent[$tagAttribute['event']])) {
33+
$container->getDefinition('enqueue.events.async_listener')
34+
->addTag('kernel.event_listener', [
35+
'event' => $tagAttribute['event'],
36+
'method' => 'onEvent',
37+
])
38+
;
39+
40+
$registeredToEvent[$tagAttribute['event']] = true;
41+
}
42+
}
43+
}
44+
45+
$registerListenersPass = new RegisterListenersPass(
46+
'enqueue.events.event_dispatcher',
47+
'enqueue.async_event_listener',
48+
'enqueue.async_event_subscriber'
49+
);
50+
$registerListenersPass->process($container);
51+
}
52+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\DependencyInjection\Compiler;
4+
5+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
8+
class AsyncTransformersPass implements CompilerPassInterface
9+
{
10+
/**
11+
* {@inheritdoc}
12+
*/
13+
public function process(ContainerBuilder $container)
14+
{
15+
if (false == $container->hasDefinition('enqueue.events.registry')) {
16+
return;
17+
}
18+
19+
$map = [];
20+
foreach ($container->findTaggedServiceIds('enqueue.event_transformer') as $serviceId => $tagAttributes) {
21+
foreach ($tagAttributes as $tagAttribute) {
22+
$map[$tagAttribute['event']] = $serviceId;
23+
}
24+
}
25+
26+
$container->getDefinition('enqueue.events.registry')
27+
->replaceArgument(0, $map)
28+
;
29+
}
30+
}

DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public function getConfigTreeBuilder()
5151
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
5252
->end()->end()
5353
->booleanNode('job')->defaultFalse()->end()
54+
->booleanNode('async_events')->defaultFalse()->end()
5455
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
5556
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
5657
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()

DependencyInjection/EnqueueExtension.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ public function load(array $configs, ContainerBuilder $container)
112112
$loader->load('job.yml');
113113
}
114114

115+
if ($config['async_events']) {
116+
$loader->load('events.yml');
117+
}
118+
115119
if ($config['extensions']['doctrine_ping_connection_extension']) {
116120
$loader->load('extensions/doctrine_ping_connection_extension.yml');
117121
}

EnqueueBundle.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use Enqueue\AmqpExt\AmqpContext;
66
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
77
use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory;
8+
use Enqueue\Bundle\DependencyInjection\Compiler\AsyncEventsPass;
9+
use Enqueue\Bundle\DependencyInjection\Compiler\AsyncTransformersPass;
810
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
911
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
1012
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
@@ -39,6 +41,8 @@ public function build(ContainerBuilder $container)
3941
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
4042
$container->addCompilerPass(new BuildQueueMetaRegistryPass());
4143
$container->addCompilerPass(new BuildClientExtensionsPass());
44+
$container->addCompilerPass(new AsyncEventsPass());
45+
$container->addCompilerPass(new AsyncTransformersPass());
4246

4347
/** @var EnqueueExtension $extension */
4448
$extension = $container->getExtension('enqueue');

Events/AsyncListener.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Client\ProducerInterface;
6+
use Symfony\Component\EventDispatcher\Event;
7+
8+
class AsyncListener
9+
{
10+
/**
11+
* @var ProducerInterface
12+
*/
13+
private $producer;
14+
15+
/**
16+
* @var Registry
17+
*/
18+
private $registry;
19+
20+
/**
21+
* @var bool
22+
*/
23+
private $syncMode;
24+
25+
/**
26+
* @param ProducerInterface $producer
27+
* @param Registry $registry
28+
*/
29+
public function __construct(ProducerInterface $producer, Registry $registry)
30+
{
31+
$this->producer = $producer;
32+
$this->registry = $registry;
33+
}
34+
35+
public function syncMode($eventName)
36+
{
37+
$this->syncMode[$eventName] = true;
38+
}
39+
40+
public function onEvent(Event $event, $eventName)
41+
{
42+
if (false == isset($this->syncMode[$eventName])) {
43+
$message = $this->registry->getTransformer($eventName)->toMessage($eventName, $event);
44+
$message->setProperty('event_name', $eventName);
45+
46+
$this->producer->send('symfony_events', $message);
47+
}
48+
}
49+
}

Events/AsyncProcessor.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Client\TopicSubscriberInterface;
6+
use Enqueue\Psr\PsrContext;
7+
use Enqueue\Psr\PsrMessage;
8+
use Enqueue\Psr\PsrProcessor;
9+
10+
class AsyncProcessor implements PsrProcessor, TopicSubscriberInterface
11+
{
12+
/**
13+
* @var Registry
14+
*/
15+
private $registry;
16+
17+
/**
18+
* @var ProxyEventDispatcher
19+
*/
20+
private $eventDispatcher;
21+
22+
/**
23+
* @param Registry $registry
24+
* @param ProxyEventDispatcher $eventDispatcher
25+
*/
26+
public function __construct(Registry $registry, ProxyEventDispatcher $eventDispatcher)
27+
{
28+
$this->registry = $registry;
29+
$this->eventDispatcher = $eventDispatcher;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function process(PsrMessage $message, PsrContext $context)
36+
{
37+
if (false == $eventName = $message->getProperty('event_name')) {
38+
return self::REJECT;
39+
}
40+
41+
// TODO set transformer's name explicitly when sending a message.
42+
43+
$event = $this->registry->getTransformer($eventName)->toEvent($eventName, $message);
44+
45+
$this->eventDispatcher->syncMode($eventName);
46+
$this->eventDispatcher->dispatch($eventName, $event);
47+
48+
return self::ACK;
49+
}
50+
51+
/**
52+
* {@inheritdoc}
53+
*/
54+
public static function getSubscribedTopics()
55+
{
56+
return ['symfony_events'];
57+
}
58+
}

Events/ContainerAwareRegistry.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
6+
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
7+
8+
class ContainerAwareRegistry implements Registry, ContainerAwareInterface
9+
{
10+
use ContainerAwareTrait;
11+
12+
/**
13+
* @var string[]
14+
*/
15+
private $transformersMap;
16+
17+
/**
18+
* @param string[] $transformersMap
19+
*/
20+
public function __construct(array $transformersMap)
21+
{
22+
$this->transformersMap = $transformersMap;
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*/
28+
public function getTransformer($eventName)
29+
{
30+
if (false == array_key_exists($eventName, $this->transformersMap)) {
31+
throw new \LogicException(sprintf('There is no transformer registered for the given event %s', $eventName));
32+
}
33+
34+
// TODO add check container returns instance of EventTransformer interface.
35+
36+
return $this->container->get($this->transformersMap[$eventName]);
37+
}
38+
}

Events/EventTransformer.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Client\Message;
6+
use Enqueue\Psr\PsrMessage;
7+
use Symfony\Component\EventDispatcher\Event;
8+
9+
interface EventTransformer
10+
{
11+
/**
12+
* @param string $eventName
13+
* @param Event|null $event
14+
*
15+
* @return Message
16+
*/
17+
public function toMessage($eventName, Event $event = null);
18+
19+
/**
20+
* @param string $eventName
21+
* @param PsrMessage $message
22+
*
23+
* @return Event|null
24+
*/
25+
public function toEvent($eventName, PsrMessage $message);
26+
}

Events/ProxyEventDispatcher.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Symfony\Component\EventDispatcher\Event;
6+
use Symfony\Component\EventDispatcher\EventDispatcher;
7+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
8+
9+
class ProxyEventDispatcher extends EventDispatcher
10+
{
11+
/**
12+
* @var EventDispatcherInterface
13+
*/
14+
private $trueEventDispatcher;
15+
16+
/**
17+
* @var AsyncListener
18+
*/
19+
private $asyncListener;
20+
21+
/**
22+
* @param EventDispatcherInterface $trueEventDispatcher
23+
* @param AsyncListener $asyncListener
24+
*/
25+
public function __construct(EventDispatcherInterface $trueEventDispatcher, AsyncListener $asyncListener)
26+
{
27+
$this->trueEventDispatcher = $trueEventDispatcher;
28+
$this->asyncListener = $asyncListener;
29+
}
30+
31+
/**
32+
* @param string $eventName
33+
*/
34+
public function syncMode($eventName)
35+
{
36+
$this->asyncListener->syncMode($eventName);
37+
}
38+
39+
/**
40+
* {@inheritdoc}
41+
*/
42+
public function dispatch($eventName, Event $event = null)
43+
{
44+
parent::dispatch($eventName, $event);
45+
$this->trueEventDispatcher->dispatch($eventName, $event);
46+
}
47+
}

0 commit comments

Comments
 (0)