diff --git a/DependencyInjection/MessengerPass.php b/DependencyInjection/MessengerPass.php index 2defd923..a044629e 100644 --- a/DependencyInjection/MessengerPass.php +++ b/DependencyInjection/MessengerPass.php @@ -219,7 +219,7 @@ private function guessHandledClasses(\ReflectionClass $handlerClass, string $ser throw new RuntimeException(sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::__invoke()" must be a class , "%s" given.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), $type)); } - return [(string) $parameters[0]->getType()]; + return [$parameters[0]->getType()->getName()]; } private function registerReceivers(ContainerBuilder $container, array $busIds) diff --git a/Tests/Transport/AmqpExt/ConnectionTest.php b/Tests/Transport/AmqpExt/ConnectionTest.php index e7dee842..a81f54c4 100644 --- a/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/Tests/Transport/AmqpExt/ConnectionTest.php @@ -418,6 +418,21 @@ public function testObfuscatePasswordInDsn() $connection->channel(); } + public function testAmqpStampHeadersAreUsed() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); + } + public function testItCanPublishWithTheDefaultRoutingKey() { $factory = new TestAmqpFactory( diff --git a/Tests/Transport/Doctrine/ConnectionTest.php b/Tests/Transport/Doctrine/ConnectionTest.php index 83708c50..d2caf2df 100644 --- a/Tests/Transport/Doctrine/ConnectionTest.php +++ b/Tests/Transport/Doctrine/ConnectionTest.php @@ -157,61 +157,86 @@ private function getSchemaSynchronizerMock() /** * @dataProvider buildConfigurationProvider */ - public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue) + public function testBuildConfiguration($dsn, $options, $expectedConnection, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue, $expectedAutoSetup) { $config = Connection::buildConfiguration($dsn, $options); - $this->assertEquals($expectedManager, $config['connection']); + $this->assertEquals($expectedConnection, $config['connection']); $this->assertEquals($expectedTableName, $config['table_name']); $this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']); $this->assertEquals($expectedQueue, $config['queue_name']); + $this->assertEquals($expectedAutoSetup, $config['auto_setup']); } public function buildConfigurationProvider() { - return [ - [ - 'dsn' => 'doctrine://default', - 'options' => [], - 'expectedManager' => 'default', - 'expectedTableName' => 'messenger_messages', - 'expectedRedeliverTimeout' => 3600, - 'expectedQueue' => 'default', - ], - // test options from options array - [ - 'dsn' => 'doctrine://default', - 'options' => [ - 'table_name' => 'name_from_options', - 'redeliver_timeout' => 1800, - 'queue_name' => 'important', - ], - 'expectedManager' => 'default', - 'expectedTableName' => 'name_from_options', - 'expectedRedeliverTimeout' => 1800, - 'expectedQueue' => 'important', - ], - // tests options from dsn - [ - 'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal', - 'options' => [], - 'expectedManager' => 'default', - 'expectedTableName' => 'name_from_dsn', - 'expectedRedeliverTimeout' => 1200, - 'expectedQueue' => 'normal', + yield 'no options' => [ + 'dsn' => 'doctrine://default', + 'options' => [], + 'expectedConnection' => 'default', + 'expectedTableName' => 'messenger_messages', + 'expectedRedeliverTimeout' => 3600, + 'expectedQueue' => 'default', + 'expectedAutoSetup' => true, + ]; + + yield 'test options array' => [ + 'dsn' => 'doctrine://default', + 'options' => [ + 'table_name' => 'name_from_options', + 'redeliver_timeout' => 1800, + 'queue_name' => 'important', + 'auto_setup' => false, ], - // test options from options array wins over options from dsn - [ - 'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal', - 'options' => [ - 'table_name' => 'name_from_options', - 'redeliver_timeout' => 1800, - 'queue_name' => 'important', - ], - 'expectedManager' => 'default', - 'expectedTableName' => 'name_from_options', - 'expectedRedeliverTimeout' => 1800, - 'expectedQueue' => 'important', + 'expectedConnection' => 'default', + 'expectedTableName' => 'name_from_options', + 'expectedRedeliverTimeout' => 1800, + 'expectedQueue' => 'important', + 'expectedAutoSetup' => false, + ]; + + yield 'options from dsn' => [ + 'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal&auto_setup=false', + 'options' => [], + 'expectedConnection' => 'default', + 'expectedTableName' => 'name_from_dsn', + 'expectedRedeliverTimeout' => 1200, + 'expectedQueue' => 'normal', + 'expectedAutoSetup' => false, + ]; + + yield 'options from options array wins over options from dsn' => [ + 'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal&auto_setup=true', + 'options' => [ + 'table_name' => 'name_from_options', + 'redeliver_timeout' => 1800, + 'queue_name' => 'important', + 'auto_setup' => false, ], + 'expectedConnection' => 'default', + 'expectedTableName' => 'name_from_options', + 'expectedRedeliverTimeout' => 1800, + 'expectedQueue' => 'important', + 'expectedAutoSetup' => false, + ]; + + yield 'options from dsn with falsey boolean' => [ + 'dsn' => 'doctrine://default?auto_setup=0', + 'options' => [], + 'expectedConnection' => 'default', + 'expectedTableName' => 'messenger_messages', + 'expectedRedeliverTimeout' => 3600, + 'expectedQueue' => 'default', + 'expectedAutoSetup' => false, + ]; + + yield 'options from dsn with thruthy boolean' => [ + 'dsn' => 'doctrine://default?auto_setup=1', + 'options' => [], + 'expectedConnection' => 'default', + 'expectedTableName' => 'messenger_messages', + 'expectedRedeliverTimeout' => 3600, + 'expectedQueue' => 'default', + 'expectedAutoSetup' => true, ]; } diff --git a/Transport/AmqpExt/AmqpStamp.php b/Transport/AmqpExt/AmqpStamp.php index 3d5290d8..e492de96 100644 --- a/Transport/AmqpExt/AmqpStamp.php +++ b/Transport/AmqpExt/AmqpStamp.php @@ -11,13 +11,13 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; -use Symfony\Component\Messenger\Stamp\StampInterface; +use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; /** * @author Guillaume Gammelin * @author Samuel Roze */ -final class AmqpStamp implements StampInterface +final class AmqpStamp implements NonSendableStampInterface { private $routingKey; private $flags; diff --git a/Transport/AmqpExt/Connection.php b/Transport/AmqpExt/Connection.php index e77366fe..5164ac6a 100644 --- a/Transport/AmqpExt/Connection.php +++ b/Transport/AmqpExt/Connection.php @@ -189,9 +189,7 @@ public function publish(string $body, array $headers = [], int $delay = 0, AmqpS $this->exchange(), $body, $this->getRoutingKeyForMessage($amqpStamp), - [ - 'headers' => $headers, - ], + $headers, $amqpStamp ); } @@ -221,20 +219,21 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp $this->getDelayExchange(), $body, $this->getRoutingKeyForDelay($delay, $routingKey), - [ - 'headers' => $headers, - ], + $headers, $amqpStamp ); } - private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null) + private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null) { + $attributes = $amqpStamp ? $amqpStamp->getAttributes() : []; + $attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []); + $exchange->publish( $body, $routingKey, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, - array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes) + $attributes ); } diff --git a/Transport/Doctrine/Connection.php b/Transport/Doctrine/Connection.php index 2a0527ca..5527aaf5 100644 --- a/Transport/Doctrine/Connection.php +++ b/Transport/Doctrine/Connection.php @@ -74,22 +74,19 @@ public static function buildConfiguration($dsn, array $options = []): array parse_str($components['query'], $query); } - $configuration = [ - 'connection' => $components['host'], - 'table_name' => $options['table_name'] ?? ($query['table_name'] ?? self::DEFAULT_OPTIONS['table_name']), - 'queue_name' => $options['queue_name'] ?? ($query['queue_name'] ?? self::DEFAULT_OPTIONS['queue_name']), - 'redeliver_timeout' => $options['redeliver_timeout'] ?? ($query['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']), - 'auto_setup' => $options['auto_setup'] ?? ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']), - ]; + $configuration = ['connection' => $components['host']]; + $configuration += $options + $query + self::DEFAULT_OPTIONS; + + $configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN); // check for extra keys in options - $optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration)); + $optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS)); if (0 < \count($optionsExtraKeys)) { throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', self::DEFAULT_OPTIONS))); } // check for extra keys in options - $queryExtraKeys = array_diff(array_keys($query), array_keys($configuration)); + $queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS)); if (0 < \count($queryExtraKeys)) { throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', self::DEFAULT_OPTIONS))); }