Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Added IProducer + fixed some issues
Browse files Browse the repository at this point in the history
  • Loading branch information
bckp committed Aug 31, 2022
1 parent b87cf1a commit a17fd40
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 66 deletions.
4 changes: 2 additions & 2 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Contributte\RabbitMQ;

use Contributte\RabbitMQ\Producer\Exception\ProducerFactoryException;
use Contributte\RabbitMQ\Producer\Producer;
use Contributte\RabbitMQ\Producer\IProducer;
use Contributte\RabbitMQ\Producer\ProducerFactory;

/**
Expand All @@ -23,7 +23,7 @@ public function __construct(private ProducerFactory $producerFactory)
/**
* @throws ProducerFactoryException
*/
public function getProducer(string $name): Producer
public function getProducer(string $name): IProducer
{
return $this->producerFactory->getProducer($name);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function sendHeartbeat(): void
$this->getWriter()->appendFrame(new Protocol\HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer();

$this->options['heartbeat_callback']?->call($this);
$this->options['heartbeat_callback'] && $this->options['heartbeat_callback']();
}

public function syncDisconnect(int $replyCode = 0, string $replyText = ""): bool
Expand Down Expand Up @@ -110,7 +110,7 @@ public function run($maxSeconds = null): void
}

do {
$this->options['cycle_callback']?->call($this);
$this->options['cycle_callback'] && $this->options['cycle_callback']();
if (!empty($this->queue)) {
$frame = array_shift($this->queue);
} else {
Expand Down
5 changes: 5 additions & 0 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
use Bunny\Channel;
use Bunny\Exception\ClientException;
use Contributte\RabbitMQ\Connection\Exception\ConnectionException;
use function in_array;
use function max;
use function time;

final class Connection implements IConnection
{
Expand Down Expand Up @@ -65,6 +68,7 @@ public function __construct(
$this->heartbeat = max($heartbeat, self::HEARTBEAT_INTERVAL);

if (!$lazy) {
$this->lastBeat = time();
$this->bunnyClient->connect();
}
}
Expand Down Expand Up @@ -134,6 +138,7 @@ public function connectIfNeeded(): void
return;
}

$this->lastBeat = time();
$this->bunnyClient->connect();
}

Expand Down
62 changes: 17 additions & 45 deletions src/Consumer/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ public function __construct(
$this->callback = $callback;
}


public function getQueue(): IQueue
{
return $this->queue;
}


public function getCallback(): callable
{
return $this->callback;
Expand Down Expand Up @@ -82,50 +80,24 @@ public function consume(?int $maxSeconds = null, ?int $maxMessages = null): void

protected function sendResponse(Message $message, Channel $channel, int $result, Client $client): void
{
switch ($result) {
case IConsumer::MESSAGE_ACK:
// Acknowledge message
$channel->ack($message);

break;

case IConsumer::MESSAGE_NACK:
// Message will be requeued
$channel->nack($message);

break;

case IConsumer::MESSAGE_REJECT:
// Message will be discarded
$channel->reject($message, false);

break;

case IConsumer::MESSAGE_REJECT_AND_TERMINATE:
// Message will be discarded
$channel->reject($message, false);
$client->stop();

break;

case IConsumer::MESSAGE_ACK_AND_TERMINATE:
// Acknowledge message and terminate
$channel->ack($message);
$client->stop();

break;

case IConsumer::MESSAGE_NACK_AND_TERMINATE:
// Message will be requeued
$channel->nack($message);
$client->stop();

break;
match ($result) {
IConsumer::MESSAGE_ACK, IConsumer::MESSAGE_ACK_AND_TERMINATE => $channel->ack($message),
IConsumer::MESSAGE_NACK, IConsumer::MESSAGE_NACK_AND_TERMINATE => $channel->nack($message),
IConsumer::MESSAGE_REJECT, IConsumer::MESSAGE_REJECT_AND_TERMINATE => $channel->reject($message, false),
default => throw new \InvalidArgumentException("Unknown return value of consumer [{$this->name}] user callback"),
};

default:
throw new \InvalidArgumentException(
"Unknown return value of consumer [{$this->name}] user callback"
);
if (in_array(
$result,
[
IConsumer::MESSAGE_REJECT_AND_TERMINATE,
IConsumer::MESSAGE_ACK_AND_TERMINATE,
IConsumer::MESSAGE_NACK_AND_TERMINATE
],
true
)
) {
$client->stop();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Diagnostics/BarPanel.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Contributte\RabbitMQ\Diagnostics;

use Contributte\RabbitMQ\Producer\Producer;
use Contributte\RabbitMQ\Producer\IProducer;
use Contributte\RabbitMQ\Producer\ProducerFactory;
use Nette\Utils\Html;
use Tracy\IBarPanel;
Expand All @@ -26,7 +26,7 @@ class BarPanel implements IBarPanel
public function __construct(private ProducerFactory $producerFactory)
{
$this->producerFactory->addOnCreatedCallback(
function (string $name, Producer $producer): void {
function (string $name, IProducer $producer): void {
$this->sentMessages[$name] = [];
$producer->addOnPublishCallback(
function (string $message) use ($name): void {
Expand Down
17 changes: 17 additions & 0 deletions src/Producer/IProducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Contributte\RabbitMQ\Producer;

interface IProducer
{
public const DELIVERY_MODE_NON_PERSISTENT = 1;
public const DELIVERY_MODE_PERSISTENT = 2;

/**
* @param array<string, string|int> $headers
*/
public function publish(string $message, array $headers = [], ?string $routingKey = null): void;
public function addOnPublishCallback(callable $callback): void;
}
6 changes: 1 addition & 5 deletions src/Producer/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
use Contributte\RabbitMQ\LazyDeclarator;
use Contributte\RabbitMQ\Queue\IQueue;

final class Producer
final class Producer implements IProducer
{

public const DELIVERY_MODE_NON_PERSISTENT = 1;
public const DELIVERY_MODE_PERSISTENT = 2;

/**
* @var callable[]
*/
Expand Down
18 changes: 9 additions & 9 deletions src/Producer/ProducerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@ final class ProducerFactory
public array $createdCallbacks = [];

/**
* @var Producer[]
* @var IProducer[]
*/
private array $producers = [];


public function __construct(private ProducersDataBag $producersDataBag, private QueueFactory $queueFactory, private ExchangeFactory $exchangeFactory, private LazyDeclarator $lazyDeclarator)
{
public function __construct(
private ProducersDataBag $producersDataBag,
private QueueFactory $queueFactory,
private ExchangeFactory $exchangeFactory,
private LazyDeclarator $lazyDeclarator
) {
}


/**
* @throws ProducerFactoryException
*/
public function getProducer(string $name): Producer
public function getProducer(string $name): IProducer
{
if (!isset($this->producers[$name])) {
$this->producers[$name] = $this->create($name);
Expand All @@ -40,17 +42,15 @@ public function getProducer(string $name): Producer
return $this->producers[$name];
}


public function addOnCreatedCallback(callable $callback): void
{
$this->createdCallbacks[] = $callback;
}


/**
* @throws ProducerFactoryException
*/
private function create(string $name): Producer
private function create(string $name): IProducer
{
try {
$producerData = $this->producersDataBag->getDataByKey($name);
Expand Down
2 changes: 1 addition & 1 deletion src/Producer/ProducersDataBag.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function __construct(array $data)
*/
public function addProducerByData(string $producerName, array $data): void
{
$data['deliveryMode'] ??= Producer::DELIVERY_MODE_PERSISTENT;
$data['deliveryMode'] ??= IProducer::DELIVERY_MODE_PERSISTENT;
$data['contentType'] ??= 'text/plain';
$data['exchange'] ??= null;
$data['queue'] ??= null;
Expand Down

0 comments on commit a17fd40

Please sign in to comment.