Skip to content

Use case for possible improvement #41

@seyfer

Description

@seyfer

Hello

I want to show you one use case which out team have used for our project. I hope it will show ways to improve current approach.
So, we believe, that current Consumer::consume() method which use get() method to get messages from queue isn't the best solution. Php AMQP extension object AMQPQueue has method consume() and it's work like a daemon.
We haven't found any possible solution to create Consumer with consume() method instead of default with get() method. And we did this.

In our config we have

'plugin_managers' => [
            'consumer' => [
                'abstract_factories' => [
                    \Application\Core\Infrastructure\Humus\Consumer\ConsumerAbstractServiceFactory::class
                ]
            ],
            'callback' => [
                'factories' => [
                    'import.core.application.console.history.consumer.HistoryConsumer' =>
//for example                        \Application\Core\Application\Console\History\Consumer\HistoryConsumerFactory::class,
                ],
            ],
        ],

We copied all code from default ConsumerAbstractServiceFactory and have changed only one line to use our Consumer.

$consumer = new Consumer($queues, $idleTimeout, $waitTimeout);

So, there is first bad place. Redefining of consumer should be easier.
And there is our Consumer code

class Consumer extends \HumusAmqpModule\Consumer
{
    /**
     * @param int $msgAmount
     */
    public function consume($msgAmount = 0)
    {
        $this->target = $msgAmount;
        /** @var \AMQPQueue $queue */
        foreach($this->queues as $queue) {
            $queue->consume(function ($message, $queue) {
                if (!$this->timestampLastAck) {
                    $this->timestampLastAck = microtime(1);
                }

                try {
                    $processFlag = $this->handleDelivery($message, $queue);
                } catch (\Exception $e) {
                    $this->handleDeliveryException($e);
                    $processFlag = false;
                }
                $this->handleProcessFlag($message, $processFlag);

                $now = microtime(1);

                if ($this->countMessagesUnacked > 0
                    && ($this->countMessagesUnacked == $this->blockSize
                        || ($now - $this->timestampLastAck) > $this->idleTimeout
                    )
                ) {
                    $this->ackOrNackBlock();
                }

                if ($this->usePcntlSignalDispatch) {
                    // Check for signals
                    pcntl_signal_dispatch();
                }

                if (!$this->keepAlive || (0 != $this->target && $this->countMessagesConsumed >= $this->target)) {
                    return;
                }
            });
        }
    }
}

As you can see we just moved original consumer code to callback function in consume() method.

This approach gives us better usage of module for our needs.
It would be great, if there will be easier way to overwrite Consumer or possibility to choose between two options - use get() method or consume() for running messages.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions