Skip to content

[Messenger] Consume batch of messages (manually, instead of worker), consuming multiple messages at once, BULK message processing at once #36910

Closed
@vgrankin

Description

@vgrankin

Greetings! I can't believe it but i can't find any information regarding this trivial use-case. I am currently reacting to RabbitMQ messages in MyConsumer::consume() method, but i don't want to process these messages immediately 1 by 1, so i decided to use Messenger to dispatch/put them in MySQL table using Doctrine transport to process some BATCH of them later.
I want to process multiple messages of one class type at once, as a group. I have some reasons for it. For example to avoid multiple network requests for each message and instead perform 1 request of 10 messages (for example).

So everything looked good until I realised that I still have the same problem. Messenger's worker consumes 1 message at a time. I can not consume a BATCH of messages AT ONCE using default Worker. Like I don't have control over manually pulling the ones I want and acknowledging/rejecting them as a group.

This is a disaster! I really want functionality of Messenger, like putting messages to failed queue in case of fail of delivery for several times etc.. But I want to process messages for some class type in BULK, acknowledge them after ALL messages in the batch are handled/processed or reject all of them. Also, there is no way i can group these messages in advance. I can do it only at a later stage, when messages ended up in the messenger_messages table.

More precisely: messages are coming. I want to put them into mysql table. Then I have a cron which will run let's say every 10 minutes. In this cron i want to iterate over all messages of some type, aggregate some data into 1 object of my interest which i will then try to send over the network. If this fails X times (server is down, whatever), I want to put all messages in this iteration to failed queue and at some point try again for some amount or all of these failed messages.

Is there any way around this? Or should I write a custom worker for this? What i need is more like a cron which processes batches, not a 1 by 1 message listener/worker. Here is a prototype of what I mean if Worker was processing in BULK, hope it makes sense:

foreach ($this->receivers as $transportName => $receiver) {
	$envelopes = $receiver->get();

	foreach ($envelopes as $envelope) {
            if ($envelope->getMessage() instanceof \App\Message\Entities\LuckyEntity) {
	        $data      = $this->extractPartialData($envelope);
	        $this->fullData->append($data);
            }
	}

	try {
	    $this->process($this->fullData);
	} catch (\Exception $e) {
	    $this->rejectAll($envelopes);
                
	    return;
	}
            
	$this->ackAll($envelopes);

Also I tried to investigate how messages are transported to the queue (mysql table) and thought of kinda joining data to 1 row instead of creating multiple rows, so to later extract multiple messages from 1 row in a table, but currently it doesn't look like a trivial task. Also thought about custom Worker.. or maybe some intermediate stage, which aggregates/rearranges rows in the queue..

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions