Skip to content

univeros/messaging — Symfony Messenger bridge with scaffold + worker #21

@tonydspaniard

Description

@tonydspaniard

Goal

Build univeros/messaging — a thin wrapper over Symfony Messenger that integrates with Altair\Container\Container and provides:

  1. A simple dispatch(object $message) API exposed through a MessageBus shared service.
  2. A bin/altair worker command to consume from configured transports.
  3. A queue: block in the scaffolder (Queue Library #3) that emits job classes + handlers + bus wiring.

Like #4, we're wrapping not rewriting. Symfony Messenger has 6+ years of head start on transport adapters (AMQP, Redis Streams, SQS, Doctrine, Beanstalkd, Sync, InMemory) and a strong middleware story.

Why Messenger

The "should we build a queue package?" question came up earlier. The honest answer was: don't build the transports, ride Messenger. This issue is the bridge that makes that ergonomic.

The framework's value-add over raw Messenger:

  • DI'd handler resolution through the framework's container, not Symfony's
  • The same Action/Input shape applied to jobs — a job is just an asynchronous Action
  • Spec-driven scaffolding so queue: in the spec wires the whole chain
  • A WorkerCommand that integrates with the framework's signal handling, OpenTelemetry hooks (future), and exit-code conventions

Shape

src/Altair/Messaging/
├── Contracts/
│   ├── MessageBusInterface.php       # tiny — just dispatch(object): Envelope
│   └── HandlerInterface.php          # marker; handlers are __invoke-able classes
├── MessageBus.php                    # default impl, delegates to Symfony\Messenger\MessageBus
├── HandlerLocator.php                # framework-container-backed locator for Messenger
├── Middleware/
│   ├── ContainerHandlerMiddleware.php
│   └── LoggingMiddleware.php         # optional, off by default
├── Cli/
│   ├── WorkerCommand.php             # `bin/altair worker [--transports=...]`
│   └── FailedRetryCommand.php        # `bin/altair worker:retry-failed`
├── Configuration/
│   └── MessengerConfiguration.php    # transport binding from env
├── Attribute/
│   └── AsHandler.php                 # #[AsHandler(MessageClass::class)]
└── composer.json

API surface

Dispatching

use Altair\Messaging\Contracts\MessageBusInterface;

final readonly class CreateUser
{
    public function __construct(private MessageBusInterface $bus) {}

    public function __invoke(CreateUserInput $input): User
    {
        $user = /* ... create the user ... */;
        $this->bus->dispatch(new SendWelcomeEmail($user->id, $user->email));
        return $user;
    }
}

Handlers

namespace App\Messages;

use Altair\Messaging\Attribute\AsHandler;

#[AsHandler(SendWelcomeEmail::class)]
final class SendWelcomeEmailHandler
{
    public function __construct(private readonly MailerInterface $mailer) {}

    public function __invoke(SendWelcomeEmail $message): void
    {
        // ...
    }
}

That's it — no MessageHandlerInterface ceremony, no getHandledMessages() boilerplate. The attribute + __invoke is the contract.

Configuration

.env:

MESSENGER_TRANSPORT_DEFAULT=redis://localhost:6379/messages
# or
MESSENGER_TRANSPORT_DEFAULT=doctrine://default?queue_name=default
# or for tests
MESSENGER_TRANSPORT_DEFAULT=sync://

MessengerConfiguration parses this and wires Symfony's TransportFactory chain. Multiple transports supported via MESSENGER_TRANSPORT_HIGH=..., etc. Routing by message class via container config.

Worker CLI

bin/altair worker                          # consume default transport
bin/altair worker --transports=default,high # multiple
bin/altair worker --time-limit=3600       # exit after N seconds (for systemd / Kubernetes)
bin/altair worker --memory-limit=128M     # exit when memory exceeds limit
bin/altair worker --limit=100             # exit after N messages

bin/altair worker:retry-failed            # retry from the failure transport
bin/altair worker:show-failed             # inspect failed messages

Graceful shutdown on SIGTERM / SIGINT. Exit codes follow Symfony conventions.

Scaffolder extension (depends on #3)

#3's spec format gains a queue: block:

endpoint:
  method: POST
  path: /users
  ...

queue:
  on_create:
    message: App\Messages\SendWelcomeEmail
    fields:
      user_id: string
      email: string
    transport: default

The scaffolder now emits:

app/Messages/SendWelcomeEmail.php          # message DTO (readonly)
app/Messages/SendWelcomeEmailHandler.php   # handler stub with TODO
tests/Messages/SendWelcomeEmailHandlerTest.php   # golden test stub

…and arranges that CreateUser (the domain class) gets a MessageBusInterface injected so it can dispatch the message when the endpoint succeeds.

Acceptance criteria

  • MessageBusInterface::dispatch works against sync://, redis://, and doctrine:// transports
  • Handlers discovered by #[AsHandler(MessageClass::class)] attribute scanning (no manual registration required for the common case)
  • bin/altair worker consumes messages, runs handlers, handles SIGTERM gracefully
  • Failed messages route to a configurable failure transport; worker:retry-failed re-dispatches them
  • Scaffolder queue: block produces a working message + handler + test
  • Tests:
    • Unit: MessageBus, HandlerLocator, ContainerHandlerMiddleware
    • Integration: end-to-end dispatch + consume against redis transport (CI service container)
    • Scaffolder integration: spec → handler emitted → handler runs

Out of scope

  • Scheduled / cron jobs (symfony/scheduler integration could be a follow-up)
  • Sagas / process managers (Messenger has these patterns; we can document later)
  • Outbox pattern (deserves its own package later)
  • Kafka transport (Messenger doesn't ship it; users can add via symfony/messenger-kafka)

Dependencies

New composer deps:

  • symfony/messenger: ^7.0
  • symfony/serializer: ^7.0 (Messenger needs it)

Optional (per-transport):

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions