Skip to content

Commit ff50ff3

Browse files
committed
Init commit.
0 parents  commit ff50ff3

File tree

70 files changed

+4309
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+4309
-0
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

.travis.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source --ignore-platform-reqs
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\Consumption\Extension;
3+
4+
use Enqueue\Consumption\Context;
5+
use Enqueue\Consumption\EmptyExtensionTrait;
6+
use Enqueue\Consumption\ExtensionInterface;
7+
use Symfony\Bridge\Doctrine\RegistryInterface;
8+
9+
class DoctrineClearIdentityMapExtension implements ExtensionInterface
10+
{
11+
use EmptyExtensionTrait;
12+
13+
/**
14+
* @var RegistryInterface
15+
*/
16+
protected $registry;
17+
18+
/**
19+
* @param RegistryInterface $registry
20+
*/
21+
public function __construct(RegistryInterface $registry)
22+
{
23+
$this->registry = $registry;
24+
}
25+
26+
/**
27+
* {@inheritdoc}
28+
*/
29+
public function onPreReceived(Context $context)
30+
{
31+
foreach ($this->registry->getManagers() as $name => $manager) {
32+
$context->getLogger()->debug(sprintf(
33+
'[DoctrineClearIdentityMapExtension] Clear identity map for manager "%s"',
34+
$name
35+
));
36+
37+
$manager->clear();
38+
}
39+
}
40+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\Consumption\Extension;
3+
4+
use Doctrine\DBAL\Connection;
5+
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\EmptyExtensionTrait;
7+
use Enqueue\Consumption\ExtensionInterface;
8+
use Symfony\Bridge\Doctrine\RegistryInterface;
9+
10+
class DoctrinePingConnectionExtension implements ExtensionInterface
11+
{
12+
use EmptyExtensionTrait;
13+
14+
/**
15+
* @var RegistryInterface
16+
*/
17+
protected $registry;
18+
19+
/**
20+
* @param RegistryInterface $registry
21+
*/
22+
public function __construct(RegistryInterface $registry)
23+
{
24+
$this->registry = $registry;
25+
}
26+
27+
/**
28+
* {@inheritdoc}
29+
*/
30+
public function onPreReceived(Context $context)
31+
{
32+
/** @var Connection $connection */
33+
foreach ($this->registry->getConnections() as $connection) {
34+
if ($connection->ping()) {
35+
return;
36+
}
37+
38+
$context->getLogger()->debug(
39+
'[DoctrinePingConnectionExtension] Connection is not active trying to reconnect.'
40+
);
41+
42+
$connection->close();
43+
$connection->connect();
44+
45+
$context->getLogger()->debug(
46+
'[DoctrinePingConnectionExtension] Connection is active now.'
47+
);
48+
}
49+
}
50+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\DependencyInjection\Compiler;
3+
4+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
5+
use Symfony\Component\DependencyInjection\ContainerBuilder;
6+
7+
class AddTopicMetaPass implements CompilerPassInterface
8+
{
9+
/**
10+
* @var array
11+
*/
12+
private $topicsMeta;
13+
14+
public function __construct()
15+
{
16+
$this->topicsMeta = [];
17+
}
18+
19+
/**
20+
* @param string $topicName
21+
* @param string $topicDescription
22+
* @param array $topicSubscribers
23+
*
24+
* @return $this
25+
*/
26+
public function add($topicName, $topicDescription = '', array $topicSubscribers = [])
27+
{
28+
$this->topicsMeta[$topicName] = [];
29+
30+
if ($topicDescription) {
31+
$this->topicsMeta[$topicName]['description'] = $topicDescription;
32+
}
33+
34+
if ($topicSubscribers) {
35+
$this->topicsMeta[$topicName]['processors'] = $topicSubscribers;
36+
}
37+
38+
return $this;
39+
}
40+
41+
/**
42+
* {@inheritdoc}
43+
*/
44+
public function process(ContainerBuilder $container)
45+
{
46+
$metaRegistryId = 'enqueue.client.meta.topic_meta_registry';
47+
48+
if (false == $container->hasDefinition($metaRegistryId)) {
49+
return;
50+
}
51+
52+
$metaRegistry = $container->getDefinition($metaRegistryId);
53+
54+
$metaRegistry->replaceArgument(0, array_merge_recursive($metaRegistry->getArgument(0), $this->topicsMeta));
55+
}
56+
57+
/**
58+
* @return static
59+
*/
60+
public static function create()
61+
{
62+
return new static();
63+
}
64+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\DependencyInjection\Compiler;
3+
4+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
5+
use Symfony\Component\DependencyInjection\ContainerBuilder;
6+
7+
class BuildClientRoutingPass implements CompilerPassInterface
8+
{
9+
use ExtractMessageProcessorTagSubscriptionsTrait;
10+
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function process(ContainerBuilder $container)
15+
{
16+
$processorTagName = 'enqueue.client.message_processor';
17+
$routerId = 'enqueue.client.router_processor';
18+
19+
if (false == $container->hasDefinition($routerId)) {
20+
return;
21+
}
22+
23+
$configs = [];
24+
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
25+
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);
26+
27+
foreach ($subscriptions as $subscription) {
28+
$configs[$subscription['topicName']][] = [
29+
$subscription['processorName'],
30+
$subscription['queueName'],
31+
];
32+
}
33+
}
34+
35+
$router = $container->getDefinition($routerId);
36+
$router->replaceArgument(1, $configs);
37+
}
38+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\DependencyInjection\Compiler;
3+
4+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
5+
use Symfony\Component\DependencyInjection\ContainerBuilder;
6+
use Symfony\Component\DependencyInjection\Reference;
7+
8+
class BuildExtensionsPass implements CompilerPassInterface
9+
{
10+
/**
11+
* {@inheritdoc}
12+
*/
13+
public function process(ContainerBuilder $container)
14+
{
15+
$tags = $container->findTaggedServiceIds('enqueue.consumption.extension');
16+
17+
$groupByPriority = [];
18+
foreach ($tags as $serviceId => $tagAttributes) {
19+
foreach ($tagAttributes as $tagAttribute) {
20+
$priority = isset($tagAttribute['priority']) ? (int) $tagAttribute['priority'] : 0;
21+
22+
$groupByPriority[$priority][] = new Reference($serviceId);
23+
}
24+
}
25+
26+
ksort($groupByPriority);
27+
28+
$flatExtensions = [];
29+
foreach ($groupByPriority as $extension) {
30+
$flatExtensions = array_merge($flatExtensions, $extension);
31+
}
32+
33+
$container->getDefinition('enqueue.consumption.extensions')->replaceArgument(0, $flatExtensions);
34+
}
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\DependencyInjection\Compiler;
3+
4+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
5+
use Symfony\Component\DependencyInjection\ContainerBuilder;
6+
7+
class BuildMessageProcessorRegistryPass implements CompilerPassInterface
8+
{
9+
use ExtractMessageProcessorTagSubscriptionsTrait;
10+
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function process(ContainerBuilder $container)
15+
{
16+
$processorTagName = 'enqueue.client.message_processor';
17+
$processorRegistryId = 'enqueue.client.message_processor_registry';
18+
19+
if (false == $container->hasDefinition($processorRegistryId)) {
20+
return;
21+
}
22+
23+
$processorIds = [];
24+
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
25+
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);
26+
27+
foreach ($subscriptions as $subscription) {
28+
$processorIds[$subscription['processorName']] = $serviceId;
29+
}
30+
}
31+
32+
$processorRegistryDef = $container->getDefinition($processorRegistryId);
33+
$processorRegistryDef->setArguments([$processorIds]);
34+
}
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\DependencyInjection\Compiler;
3+
4+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
5+
use Symfony\Component\DependencyInjection\ContainerBuilder;
6+
7+
class BuildQueueMetaRegistryPass implements CompilerPassInterface
8+
{
9+
use ExtractMessageProcessorTagSubscriptionsTrait;
10+
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function process(ContainerBuilder $container)
15+
{
16+
$processorTagName = 'enqueue.client.message_processor';
17+
$queueMetaRegistryId = 'enqueue.client.meta.queue_meta_registry';
18+
if (false == $container->hasDefinition($queueMetaRegistryId)) {
19+
return;
20+
}
21+
22+
$queueMetaRegistry = $container->getDefinition($queueMetaRegistryId);
23+
24+
$configs = [];
25+
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
26+
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);
27+
28+
foreach ($subscriptions as $subscription) {
29+
$configs[$subscription['queueName']]['processors'][] = $subscription['processorName'];
30+
}
31+
}
32+
33+
$queueMetaRegistry->replaceArgument(1, $configs);
34+
}
35+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
namespace Enqueue\EnqueueBundle\DependencyInjection\Compiler;
3+
4+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
5+
use Symfony\Component\DependencyInjection\ContainerBuilder;
6+
7+
class BuildTopicMetaSubscribersPass implements CompilerPassInterface
8+
{
9+
use ExtractMessageProcessorTagSubscriptionsTrait;
10+
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function process(ContainerBuilder $container)
15+
{
16+
$processorTagName = 'enqueue.client.message_processor';
17+
18+
$topicsSubscribers = [];
19+
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
20+
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);
21+
22+
foreach ($subscriptions as $subscription) {
23+
$topicsSubscribers[$subscription['topicName']][] = $subscription['processorName'];
24+
}
25+
}
26+
27+
$addTopicMetaPass = AddTopicMetaPass::create();
28+
foreach ($topicsSubscribers as $topicName => $subscribers) {
29+
$addTopicMetaPass->add($topicName, '', $subscribers);
30+
}
31+
32+
$addTopicMetaPass->process($container);
33+
}
34+
}

0 commit comments

Comments
 (0)