Skip to content

Commit

Permalink
Merge pull request #2 from tienvx/filter-envelope-by-transport
Browse files Browse the repository at this point in the history
Allow filter envelope by transport
  • Loading branch information
tienvx authored Mar 11, 2023
2 parents e0268d6 + 4d0c056 commit fbf1c0b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
5 changes: 4 additions & 1 deletion src/EventListener/SendMessageToTransportsEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public function __construct(private EnvelopeCollectorInterface $collector)

public function __invoke(SendMessageToTransportsEvent $event): void
{
$this->collector->collect($event->getEnvelope());
$this->collector->collect(
$event->getEnvelope(),
method_exists($event, 'getSenders') ? array_keys($event->getSenders()) : null
);
}
}
7 changes: 7 additions & 0 deletions src/Exception/LogicException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Tienvx\Bundle\PactMessengerBundle\Exception;

class LogicException extends \LogicException
{
}
21 changes: 18 additions & 3 deletions src/Service/EnvelopeCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,41 @@
namespace Tienvx\Bundle\PactMessengerBundle\Service;

use Symfony\Component\Messenger\Envelope;
use Tienvx\Bundle\PactMessengerBundle\Exception\LogicException;

class EnvelopeCollector implements EnvelopeCollectorInterface
{
/** @var array<int, Envelope> */
private array $envelopes;

public function collect(Envelope $envelope): void
/** @var array<int, string[]|null> */
private array $transportsByEnvelope;

public function collect(Envelope $envelope, ?array $transports = null): void
{
$this->envelopes[] = $envelope;
$this->transportsByEnvelope[spl_object_id($envelope)] = $transports;
}

public function getAll(): array
{
return $this->envelopes;
}

public function getSingle(string $messageFqcn): ?Envelope
public function getSingle(string $messageFqcn, ?string $transport = null): ?Envelope
{
foreach ($this->envelopes as $envelope) {
if ($envelope->getMessage() instanceof $messageFqcn) {
if (!$envelope->getMessage() instanceof $messageFqcn) {
continue;
}
if (is_null($transport)) {
return $envelope;
}
$transports = $this->transportsByEnvelope[spl_object_id($envelope)];
if (is_null($transports)) {
throw new LogicException('You need to upgrade to Symfony >=6.2 to be able to filter envelope by transport.');
}
if (in_array($transport, $transports)) {
return $envelope;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Service/EnvelopeCollectorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

interface EnvelopeCollectorInterface
{
public function collect(Envelope $envelope): void;
public function collect(Envelope $envelope, ?array $transports = null): void;

public function getAll(): array;

public function getSingle(string $messageFqcn): ?Envelope;
public function getSingle(string $messageFqcn, ?string $transport = null): ?Envelope;
}

0 comments on commit fbf1c0b

Please sign in to comment.