Asynchronous bulk actions for Polysource — execute over Symfony Messenger with live progress (Mercure) and cancel mid-flight.
Part of the Polysource monorepo. MIT-licensed.
The synchronous bulk action ("Retry 5 000 failed messages") times out after 30 s in production. This package fans the work out to Messenger workers, persists per-job progress every 5 records or 500 ms, and exposes a JSON endpoint + Mercure topic so the UI can show a live progress bar and a "Cancel" button.
See ADR-024.
BulkJobimmutable VO (12 fields, 8 KiB error cap) +BulkJobStatusenum (5 states,isTerminal()).BulkJobStorageInterface+DoctrineBulkJobStorage+ Doctrine entity.BulkJobMessage+BulkJobHandler— re-fetches each iteration to honour Cancelled, throttled persist (5 records OR 500 ms), per-record exception isolation.AsyncBulkActionDispatcher— UUID v7 + Pending persist + Messenger dispatch.AsyncAwareBulkActionInterface— opt-in marker (parallel interface, no BC break toBulkActionInterface).BulkJobResource— browsable admin resource (#[AsResource], slugbulk-jobs).CancelBulkJobAction— idempotent on terminal, gatedPOLYSOURCE_BULK_JOB_CANCEL.ProgressController— JSONGET /admin/bulk-jobs/{id}/progress. Two-stage gate: coarsePOLYSOURCE_BULK_JOB_VIEWpermission + ownership check (requester must own the job, or holdPOLYSOURCE_BULK_JOB_VIEW_ANY).MercureBulkJobBroadcaster— gated onclass_exists(HubInterface), hub failures swallowed, topicpolysource/bulk-jobs/{actorId}/{id}(actor segment URL-encoded). Pair with thepolysource_bulk_progress_topic(job)Twig helper so client and broadcaster always agree on the topic shape; configure your Mercure JWT subscriber claims to restrict per-actor for defence-in-depth.- Stimulus
progress_controller.js— EventSource Mercure → polling fallback auto on error.
composer require polysource/bulk-async symfony/messenger
# Optional but recommended for live progress:
composer require symfony/mercure-bundleRegister the bundle:
return [
Polysource\BulkAsync\PolysourceBulkAsyncBundle::class => ['all' => true],
];Run the migration to create polysource_bulk_jobs.
BulkJobStorageInterface is 3 methods. To persist jobs in Redis / Mongo / your service instead of Doctrine, implement it and alias the interface to your service in DI. The handler, the ProgressController, the Mercure broadcaster all keep working.
See extensibility map.