diff --git a/Tests/Transport/AmqpExt/AmqpStampTest.php b/Tests/Transport/AmqpExt/AmqpStampTest.php index d9605808..043dfb2e 100644 --- a/Tests/Transport/AmqpExt/AmqpStampTest.php +++ b/Tests/Transport/AmqpExt/AmqpStampTest.php @@ -34,4 +34,40 @@ public function testFlagsAndAttributes() $this->assertSame(AMQP_DURABLE, $stamp->getFlags()); $this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes()); } + + public function testCreateFromAmqpEnvelope() + { + $amqpEnvelope = $this->createMock(\AMQPEnvelope::class); + $amqpEnvelope->method('getRoutingKey')->willReturn('routingkey'); + $amqpEnvelope->method('getDeliveryMode')->willReturn(2); + $amqpEnvelope->method('getPriority')->willReturn(5); + $amqpEnvelope->method('getAppId')->willReturn('appid'); + + $stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope); + + $this->assertSame($amqpEnvelope->getRoutingKey(), $stamp->getRoutingKey()); + $this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']); + $this->assertSame($amqpEnvelope->getPriority(), $stamp->getAttributes()['priority']); + $this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']); + $this->assertSame(AMQP_NOPARAM, $stamp->getFlags()); + } + + public function testCreateFromAmqpEnvelopeWithPreviousStamp() + { + $amqpEnvelope = $this->createMock(\AMQPEnvelope::class); + $amqpEnvelope->method('getRoutingKey')->willReturn('routingkey'); + $amqpEnvelope->method('getDeliveryMode')->willReturn(2); + $amqpEnvelope->method('getPriority')->willReturn(5); + $amqpEnvelope->method('getAppId')->willReturn('appid'); + + $previousStamp = new AmqpStamp('otherroutingkey', AMQP_MANDATORY, ['priority' => 8]); + + $stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, $previousStamp); + + $this->assertSame('otherroutingkey', $stamp->getRoutingKey()); + $this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']); + $this->assertSame(8, $stamp->getAttributes()['priority']); + $this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']); + $this->assertSame(AMQP_MANDATORY, $stamp->getFlags()); + } } diff --git a/Transport/AmqpExt/AmqpSender.php b/Transport/AmqpExt/AmqpSender.php index b057aeaf..ae99759c 100644 --- a/Transport/AmqpExt/AmqpSender.php +++ b/Transport/AmqpExt/AmqpSender.php @@ -45,20 +45,22 @@ public function send(Envelope $envelope): Envelope $delayStamp = $envelope->last(DelayStamp::class); $delay = $delayStamp ? $delayStamp->getDelay() : 0; + /** @var AmqpStamp|null $amqpStamp */ $amqpStamp = $envelope->last(AmqpStamp::class); if (isset($encodedMessage['headers']['Content-Type'])) { $contentType = $encodedMessage['headers']['Content-Type']; unset($encodedMessage['headers']['Content-Type']); - $attributes = $amqpStamp ? $amqpStamp->getAttributes() : []; - - if (!isset($attributes['content_type'])) { - $attributes['content_type'] = $contentType; - - $amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes); + if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) { + $amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp); } } + $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); + if ($amqpReceivedStamp instanceof AmqpReceivedStamp) { + $amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp); + } + try { $this->connection->publish( $encodedMessage['body'], diff --git a/Transport/AmqpExt/AmqpStamp.php b/Transport/AmqpExt/AmqpStamp.php index e492de96..0a4777cc 100644 --- a/Transport/AmqpExt/AmqpStamp.php +++ b/Transport/AmqpExt/AmqpStamp.php @@ -44,4 +44,33 @@ public function getAttributes(): array { return $this->attributes; } + + public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self + { + $attr = $previousStamp->attributes ?? []; + + $attr['headers'] = $attr['headers'] ?? $amqpEnvelope->getHeaders(); + $attr['content_type'] = $attr['content_type'] ?? $amqpEnvelope->getContentType(); + $attr['content_encoding'] = $attr['content_encoding'] ?? $amqpEnvelope->getContentEncoding(); + $attr['delivery_mode'] = $attr['delivery_mode'] ?? $amqpEnvelope->getDeliveryMode(); + $attr['priority'] = $attr['priority'] ?? $amqpEnvelope->getPriority(); + $attr['timestamp'] = $attr['timestamp'] ?? $amqpEnvelope->getTimestamp(); + $attr['app_id'] = $attr['app_id'] ?? $amqpEnvelope->getAppId(); + $attr['message_id'] = $attr['message_id'] ?? $amqpEnvelope->getMessageId(); + $attr['user_id'] = $attr['user_id'] ?? $amqpEnvelope->getUserId(); + $attr['expiration'] = $attr['expiration'] ?? $amqpEnvelope->getExpiration(); + $attr['type'] = $attr['type'] ?? $amqpEnvelope->getType(); + $attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo(); + + return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr); + } + + public static function createWithAttributes(array $attributes, self $previousStamp = null): self + { + return new self( + $previousStamp->routingKey ?? null, + $previousStamp->flags ?? AMQP_NOPARAM, + array_merge($previousStamp->attributes ?? [], $attributes) + ); + } } diff --git a/Transport/AmqpExt/Connection.php b/Transport/AmqpExt/Connection.php index edb0be43..8e37e387 100644 --- a/Transport/AmqpExt/Connection.php +++ b/Transport/AmqpExt/Connection.php @@ -224,7 +224,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null) { $attributes = $amqpStamp ? $amqpStamp->getAttributes() : []; - $attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []); + $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers); $exchange->publish( $body,