Skip to content

Commit 109bd21

Browse files
committed
Merge branch 'master' into 0.9
2 parents a6462c7 + f4ba0a8 commit 109bd21

File tree

9 files changed

+110
-6
lines changed

9 files changed

+110
-6
lines changed

DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,16 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
8383

8484
if (is_subclass_of($processorClass, TopicSubscriberInterface::class)) {
8585
/** @var TopicSubscriberInterface $processorClass */
86-
foreach ($processorClass::getSubscribedTopics() as $topicName => $params) {
86+
$topics = $processorClass::getSubscribedTopics();
87+
if (!is_array($topics)) {
88+
throw new \LogicException(sprintf(
89+
'Topic subscriber configuration is invalid for "%s::getSubscribedTopics()": expected array, got %s.',
90+
$processorClass,
91+
gettype($topics)
92+
));
93+
}
94+
95+
foreach ($topics as $topicName => $params) {
8796
if (is_string($params)) {
8897
$data[] = [
8998
'topicName' => $params,
@@ -102,7 +111,8 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
102111
];
103112
} else {
104113
throw new \LogicException(sprintf(
105-
'Topic subscriber configuration is invalid. "%s"',
114+
'Topic subscriber configuration is invalid for "%s::getSubscribedTopics()". "%s"',
115+
$processorClass,
106116
json_encode($processorClass::getSubscribedTopics())
107117
));
108118
}

DependencyInjection/EnqueueExtension.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
namespace Enqueue\Bundle\DependencyInjection;
44

55
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension;
6+
use Enqueue\Client\CommandSubscriberInterface;
67
use Enqueue\Client\Producer;
8+
use Enqueue\Client\TopicSubscriberInterface;
79
use Enqueue\Client\TraceableProducer;
810
use Enqueue\Consumption\QueueConsumer;
911
use Enqueue\JobQueue\Job;
@@ -72,6 +74,8 @@ public function load(array $configs, ContainerBuilder $container)
7274
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
7375
$loader->load('services.yml');
7476

77+
$this->setupAutowiringForProcessors($container);
78+
7579
foreach ($config['transport'] as $name => $transportConfig) {
7680
$this->factories[$name]->createConnectionFactory($container, $transportConfig);
7781
$this->factories[$name]->createContext($container, $transportConfig);
@@ -220,4 +224,19 @@ private function registerJobQueueDoctrineEntityMapping(ContainerBuilder $contain
220224
}
221225
}
222226
}
227+
228+
private function setupAutowiringForProcessors(ContainerBuilder $container)
229+
{
230+
if (!method_exists($container, 'registerForAutoconfiguration')) {
231+
return;
232+
}
233+
234+
$container->registerForAutoconfiguration(TopicSubscriberInterface::class)
235+
->setPublic(true)
236+
->addTag('enqueue.client.processor');
237+
238+
$container->registerForAutoconfiguration(CommandSubscriberInterface::class)
239+
->setPublic(true)
240+
->addTag('enqueue.client.processor');
241+
}
223242
}

EnqueueBundle.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
use Enqueue\Fs\Symfony\FsTransportFactory;
2222
use Enqueue\Gps\GpsConnectionFactory;
2323
use Enqueue\Gps\Symfony\GpsTransportFactory;
24+
use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
25+
use Enqueue\RdKafka\RdKafkaConnectionFactory;
26+
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
2427
use Enqueue\Redis\RedisConnectionFactory;
2528
use Enqueue\Redis\Symfony\RedisTransportFactory;
2629
use Enqueue\Sqs\SqsConnectionFactory;
@@ -104,6 +107,18 @@ class_exists(AmqpLibConnectionFactory::class)
104107
$extension->setTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps']));
105108
}
106109

110+
if (class_exists(RdKafkaConnectionFactory::class)) {
111+
$extension->setTransportFactory(new RdKafkaTransportFactory('rdkafka'));
112+
} else {
113+
$extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
114+
}
115+
116+
if (class_exists(MongodbTransportFactory::class)) {
117+
$extension->setTransportFactory(new MongodbTransportFactory('mongodb'));
118+
} else {
119+
$extension->setTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb']));
120+
}
121+
107122
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
108123
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
109124
}

Tests/Functional/App/config/custom-config.yml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
parameters:
22
locale: 'en'
33
secret: 'ThisTokenIsNotSoSecretChangeIt'
4-
4+
env(AWS_SQS_REGION): 'us-east-1'
5+
env(AWS_SQS_VERSION): 'latest'
6+
env(AWS_SQS_KEY): 'key'
7+
env(AWS_SQS_SECRET): 'secret'
8+
env(AWS_SQS_ENDPOINT): 'http://localstack:4576'
59

610
framework:
711
#esi: ~
@@ -33,3 +37,15 @@ services:
3337
public: true
3438
tags:
3539
- { name: 'enqueue.client.processor' }
40+
41+
test.sqs_client:
42+
public: true
43+
class: Aws\Sqs\SqsClient
44+
arguments:
45+
-
46+
endpoint: '%env(AWS_SQS_ENDPOINT)%'
47+
region: '%env(AWS_SQS_REGION)%'
48+
version: '%env(AWS_SQS_VERSION)%'
49+
credentials:
50+
key: '%env(AWS_SQS_KEY)%'
51+
secret: '%env(AWS_SQS_SECRET)%'

Tests/Functional/UseCasesTest.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,28 @@ public function provideEnqueueConfigs()
201201
'key' => getenv('AWS_SQS_KEY'),
202202
'secret' => getenv('AWS_SQS_SECRET'),
203203
'region' => getenv('AWS_SQS_REGION'),
204+
'endpoint' => getenv('AWS_SQS_ENDPOINT'),
205+
],
206+
],
207+
]];
208+
209+
yield 'sqs_client' => [[
210+
'transport' => [
211+
'default' => 'sqs',
212+
'sqs' => [
213+
'client' => 'test.sqs_client',
204214
],
205215
],
206216
]];
207217
}
208218

219+
yield 'mongodb_dsn' => [[
220+
'transport' => [
221+
'default' => 'mongodb',
222+
'mongodb' => getenv('MONGO_DSN'),
223+
],
224+
]];
225+
209226
// yield 'gps' => [[
210227
// 'transport' => [
211228
// 'default' => 'gps',

Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali
254254
$pass = new BuildClientRoutingPass();
255255

256256
$this->expectException(\LogicException::class);
257-
$this->expectExceptionMessage('Topic subscriber configuration is invalid. "[12345]"');
257+
$this->expectExceptionMessage('Topic subscriber configuration is invalid');
258258

259259
$pass->process($container);
260260
}

Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public function testShouldBuildRouteFromSubscriberIfProcessorNameSpecified()
180180
public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvalid()
181181
{
182182
$this->expectException(\LogicException::class);
183-
$this->expectExceptionMessage('Topic subscriber configuration is invalid. "[12345]"');
183+
$this->expectExceptionMessage('Topic subscriber configuration is invalid');
184184

185185
$container = $this->createContainerBuilder();
186186

Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali
302302
$pass = new BuildTopicMetaSubscribersPass();
303303

304304
$this->expectException(\LogicException::class);
305-
$this->expectExceptionMessage('Topic subscriber configuration is invalid. "[12345]"');
305+
$this->expectExceptionMessage('Topic subscriber configuration is invalid');
306306

307307
$pass->process($container);
308308
}

Tests/Unit/DependencyInjection/EnqueueExtensionTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
77
use Enqueue\Bundle\Tests\Unit\Mocks\FooTransportFactory;
88
use Enqueue\Bundle\Tests\Unit\Mocks\TransportFactoryWithoutDriverFactory;
9+
use Enqueue\Client\CommandSubscriberInterface;
910
use Enqueue\Client\Producer;
1011
use Enqueue\Client\ProducerInterface;
12+
use Enqueue\Client\TopicSubscriberInterface;
1113
use Enqueue\Client\TraceableProducer;
1214
use Enqueue\Consumption\QueueConsumer;
1315
use Enqueue\JobQueue\JobRunner;
@@ -633,6 +635,31 @@ public function testShouldThrowIfPackageShouldBeInstalledToUseTransport()
633635
]], $container);
634636
}
635637

638+
public function testShouldLoadProcessAutoconfigureChildDefinition()
639+
{
640+
if (30300 >= Kernel::VERSION_ID) {
641+
$this->markTestSkipped('The autoconfigure feature is available since Symfony 3.3 version');
642+
}
643+
644+
$container = $this->getContainerBuilder(true);
645+
$extension = new EnqueueExtension();
646+
647+
$extension->load([[
648+
'client' => [],
649+
'transport' => [],
650+
]], $container);
651+
652+
$autoconfigured = $container->getAutoconfiguredInstanceof();
653+
654+
self::assertArrayHasKey(CommandSubscriberInterface::class, $autoconfigured);
655+
self::assertTrue($autoconfigured[CommandSubscriberInterface::class]->hasTag('enqueue.client.processor'));
656+
self::assertTrue($autoconfigured[CommandSubscriberInterface::class]->isPublic());
657+
658+
self::assertArrayHasKey(TopicSubscriberInterface::class, $autoconfigured);
659+
self::assertTrue($autoconfigured[TopicSubscriberInterface::class]->hasTag('enqueue.client.processor'));
660+
self::assertTrue($autoconfigured[TopicSubscriberInterface::class]->isPublic());
661+
}
662+
636663
/**
637664
* @param bool $debug
638665
*

0 commit comments

Comments
 (0)