Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Can't consume messages from third-party GPS publisher #130

Open
Axel29 opened this issue May 23, 2024 · 1 comment
Open

[QUESTION] Can't consume messages from third-party GPS publisher #130

Axel29 opened this issue May 23, 2024 · 1 comment

Comments

@Axel29
Copy link

Axel29 commented May 23, 2024

Hi, I'm trying to setup Google Pub/Sub (GPS) with Shopify and Symfony 6.4, but even though messages seem to be published on GPS (can see them in the Google Cloud Console), no matter which configuration I try, it doesn't seem to consume the messages.

I've installed the following packages:

  1. sroze/messenger-enqueue-transport (version 0.8.0)
  2. enqueue/gps (version 0.10.19)

Here's my configuration files:

  • config/packages/messenger.yaml :
framework:
  messenger:
    default_bus: default

    failure_transport: failed
    transports:
      failed:
        dsn: "%env(MESSENGER_TRANSPORT_DSN)%failed"
        retry_strategy:
          max_retries: 4
          delay: 1000
        options:
          check_delayed_interval: 0

      shopify_webhook_queue:
        dsn: "enqueue://default"

    routing:
      App\Message\ShopifyWebhookMessage: shopify_webhook_queue
  • config/packages/enqueue.yaml :
enqueue:
    default:
        transport: '%env(ENQUEUE_DSN)%'
        client: ~
        consumption:
            receive_timeout: 300000
  • .env.dev.local :
ENQUEUE_DSN=gps:?projectId=project-id&keyFileName=/var/www/symfony/google-credentials.json&subscriptionName=subscription-name&topicName=topic-name&receiveTimeout=300000

Then I run the command php bin/console messenger:consume shopify_webhook_queue -vvv and nothing happens.


Additional information

  1. If I try to run the command php bin/console enqueue:consume --setup-broker -vvv, I get the following error:
[Google\Cloud\Core\Exception\ServiceException]                                                                                                                                                                    
  cURL error 7: Failed to connect to localhost port 8900 after 0 ms: Couldn't connect to server (see https://curl.haxx.se/libcurl/c/libcurl-errors.html) for http://localhost:8900/v1/projects/vtwonen/topics/enqu  
  eue.default?prettyPrint=false

... but I'm trying to connect to an actual Google Pub/Sub project for my tests, not the emulator

  1. I've also tried to create a "Processor" class like bellow but didn't seem to change anything:
<?php
declare(strict_types=1);

namespace App\Queue;

use Enqueue\Client\TopicSubscriberInterface;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Psr\Log\LoggerInterface;

class ShopifyWebhookProcessor implements Processor, TopicSubscriberInterface
{
    public function __construct(
        protected LoggerInterface $logger,
    ) {
    }

    public function process(Message $message, Context $context)
    {
        $this->logger->critical(__METHOD__ . '(' . __LINE__ . ')');
    }

    public static function getSubscribedTopics()
    {
        return ['enqueue.topics']; // Also tried to set the topic name from Google Cloud Console but didn't change anything.
    }
}
  1. I have a doubt regarding the subscription name to use, I guess this is the one auto generated by Google with the topic name suffixed with -sub, or is it the same as the topic name ? For now, I've been using the name shown in the Google Cloud Console.

Thank you for your help.

@Axel29
Copy link
Author

Axel29 commented Jun 17, 2024

Hi again,

I've managed to make the Consumer work but faced mulitple issues.

The first one is that the body is not a serialized string but a JSON this time, so I had to create a custom Serializer in order to consume the messages.
The second one is that for some reason, I don't see any headers in the envelope, but they seem to be sent by Shopify in the "nativeMessage" -> "messages" -> "attributes" entry of the "$interopMessage variable (\Enqueue\MessengerAdapter\QueueInteropTransport::get). Thus, there's no way to properly dispatch my messages as I don't have any header (and Enqueue throws errors because of that).

Does anyone have any idea on what I should do next to have the headers properly interpreted please?

Here are my codes working so far:

  • messenger.yaml :
framework:
  messenger:
    default_bus: default
    transports:
      pubsub_queue:
        dsn: 'enqueue://gps?queue[name]=%env(resolve:PUBSUB_SUBSCRIPTION_NAME)%&topic[name]=%env(resolve:PUBSUB_TOPIC_NAME)%'
        serializer: App\Serializer\MessengerJsonSerializer
    routing:
      App\Message\Shopify\PubSubMessage: pubsub_queue
  • App\Message\Shopify\PubSubMessage :
<?php
declare(strict_types=1);

namespace App\Message\Shopify;

class PubSubMessage
{
    private array $payload;

    public function __construct(array $payload)
    {
        $this->payload = $payload;
    }

    public function getPayload(): array
    {
        return $this->payload;
    }
}
  • \App\MessageHandler\Shopify\PubSubMessageHandler :
<?php
declare(strict_types=1);

namespace App\MessageHandler\Shopify;

use App\Message\Shopify\PubSubMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final class PubSubMessageHandler
{
    public function __invoke(PubSubMessage $message): void
    {
        $payload = $message->getPayload();
    }
}
  • \App\Serializer\MessengerJsonSerializer :
<?php
declare(strict_types=1);

namespace App\Serializer;

use App\Message\Shopify\PubSubMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\SerializedMessageStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer\SerializerInterface as SymfonySerializerInterface;
use function json_validate;

class MessengerJsonSerializer implements SerializerInterface
{
    private SymfonySerializerInterface $serializer;
    private string                     $format;

    public function __construct(SymfonySerializerInterface $serializer, string $format = 'json')
    {
        $this->serializer = $serializer;
        $this->format     = $format;
    }

    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body'])) {
            throw new MessageDecodingFailedException('Encoded envelope should have at least a "body",  or maybe you should implement your own serializer.');
        }

        $body = $encodedEnvelope['body'];

        if (!json_validate($body)) {
            throw new MessageDecodingFailedException('Invalid JSON data: ' . json_last_error_msg());
        }

        $message = new PubSubMessage(json_decode($body, true));

        return new Envelope($message, []);
    }

    public function encode(Envelope $envelope): array
    {
        $message = $envelope->getMessage();
        $headers = ['type' => get_class($message)];

        $body = $this->serializer->serialize($message, $this->format);

        return [
            'body'    => $body,
            'headers' => $headers,
        ];
    }
}

Thanks for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant