Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code clean-up #317

Merged
merged 3 commits into from
Mar 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/Console/ExchangeDeleteCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace VladimirYuldashev\LaravelQueueRabbitMQ\Console;

use Exception;
use Illuminate\Console\Command;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;

class ExchangeDeleteCommand extends Command
{
protected $signature = 'rabbitmq:exchange-delete
{name : The name of the exchange to delete}
{connection=rabbitmq : The name of the queue connection to use}
{--unused=0 : Check if exchange is unused}';

protected $description = 'Delete exchange';

/**
* @param RabbitMQConnector $connector
* @throws Exception
*/
public function handle(RabbitMQConnector $connector): void
{
$config = $this->laravel['config']->get('queue.connections.'.$this->argument('connection'));

$queue = $connector->connect($config);

if (! $queue->isExchangeExists($this->argument('name'))) {
$this->warn('Exchange does not exist.');

return;
}

$queue->deleteExchange(
$this->argument('name'),
(bool) $this->option('unused')
);

$this->info('Exchange deleted successfully.');
}
}
43 changes: 43 additions & 0 deletions src/Console/QueueDeleteCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace VladimirYuldashev\LaravelQueueRabbitMQ\Console;

use Exception;
use Illuminate\Console\Command;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;

class QueueDeleteCommand extends Command
{
protected $signature = 'rabbitmq:queue-delete
{name : The name of the queue to delete}
{connection=rabbitmq : The name of the queue connection to use}
{--unused=0 : Check if queue has no consumers}
{--empty=0 : Check if queue is empty}';

protected $description = 'Declare queue';

/**
* @param RabbitMQConnector $connector
* @throws Exception
*/
public function handle(RabbitMQConnector $connector): void
{
$config = $this->laravel['config']->get('queue.connections.'.$this->argument('connection'));

$queue = $connector->connect($config);

if (! $queue->isQueueExists($this->argument('name'))) {
$this->warn('Queue does not exist.');

return;
}

$queue->deleteQueue(
$this->argument('name'),
(bool) $this->option('unused'),
(bool) $this->option('empty')
);

$this->info('Queue deleted successfully.');
}
}
46 changes: 30 additions & 16 deletions src/Queue/Jobs/RabbitMQJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Jobs\Job;
use Illuminate\Support\Arr;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use VladimirYuldashev\LaravelQueueRabbitMQ\Horizon\RabbitMQQueue as HorizonRabbitMQQueue;
Expand Down Expand Up @@ -55,7 +56,7 @@ public function __construct(
*/
public function getJobId()
{
return json_decode($this->message->getBody(), true)['id'] ?? null;
return $this->decoded['id'] ?? null;
}

/**
Expand All @@ -71,31 +72,26 @@ public function getRawBody(): string
*/
public function attempts(): int
{
/** @var AMQPTable|null $headers */
$headers = Arr::get($this->message->get_properties(), 'application_headers');

if (! $headers) {
if (! $data = $this->getRabbitMQMessageHeaders()) {
return 1;
}

$data = $headers->getNativeData();

$laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0);

return ($laravelAttempts) + 1;
return $laravelAttempts + 1;
}

/**
* @param null $e
* {@inheritdoc}
*/
public function fail($e = null): void
public function markAsFailed(): void
{
parent::markAsFailed();

// We must tel rabbitMQ this Job is failed
// The message must be rejected when the Job marked as failed, in case rabbitMQ wants to do some extra magic.
// like: Death lettering the message to an other exchange/routing-key.
$this->rabbitmq->reject($this);

parent::fail($e);
}

/**
Expand All @@ -120,18 +116,21 @@ public function delete(): void
}

/**
* {@inheritdoc}
* Release the job back into the queue.
*
* @param int $delay
* @throws AMQPProtocolChannelException
*/
public function release($delay = 0): void
{
parent::release();

// Always create a new message when this Job is released
$this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts());
$this->rabbitmq->laterRaw($delay, $this->message->getBody(), $this->queue, $this->attempts());

// Releasing a Job means the message was failed to process.
// Because this Job is always recreated and pushed as new message, this Job is correctly handled.
// We must tell rabbitMQ this fact.
// Because this Job message is always recreated and pushed as new message, this Job message is correctly handled.
// We must tell rabbitMQ this job message can be removed by acknowledging the message.
$this->rabbitmq->ack($this);
}

Expand All @@ -154,4 +153,19 @@ public function getRabbitMQMessage(): AMQPMessage
{
return $this->message;
}

/**
* Get the headers from the rabbitMQ message.
*
* @return array|null
*/
protected function getRabbitMQMessageHeaders(): ?array
{
/** @var AMQPTable|null $headers */
if (! $headers = Arr::get($this->message->get_properties(), 'application_headers')) {
return null;
}

return $headers->getNativeData();
}
}
Loading