Skip to content

Commit 8a924da

Browse files
author
Nick
committed
allow to set noAck to the consumer
1 parent 3f69f1c commit 8a924da

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

src/Console/ConsumeCommand.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class ConsumeCommand extends WorkCommand
2929
{--consumer-tag}
3030
{--prefetch-size=0}
3131
{--prefetch-count=1000}
32+
{--no-acknowledged=false : Консуматорът трябва изрично да изпрати потвърждение (acknowledgement) обратно към брокера след като обработи съобщението. Ако консуматорът не изпрати такова потвърждение, брокерът няма да маркира съобщението като обработено и ще го запази в опашката (или потенциално ще го достави на друг консуматор). Този подход се използва за гарантиране, че съобщенията не се губят при срив на консуматора или други проблеми при обработката.}
3233
';
3334

3435
protected $description = 'Consume messages';
@@ -41,9 +42,12 @@ public function handle(): void
4142
$consumer->setContainer($this->laravel);
4243
$consumer->setName($this->option('name'));
4344
$consumer->setConsumerTag($this->consumerTag());
45+
$consumer->setNoAcknowledged($this->consumerTag());
4446
$consumer->setMaxPriority((int) $this->option('max-priority'));
4547
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
4648
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
49+
$noAck = $this->option('no-acknowledged') === 'true';
50+
$consumer->setNoAcknowledged($noAck);
4751

4852
parent::handle();
4953
}

src/Consumer.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class Consumer extends Worker
3535
/** @var object|null */
3636
protected $currentJob;
3737

38+
protected $noAcknowledged = false;
39+
3840
public function setContainer(Container $value): void
3941
{
4042
$this->container = $value;
@@ -60,6 +62,16 @@ public function setPrefetchCount(int $value): void
6062
$this->prefetchCount = $value;
6163
}
6264

65+
public function setNoAcknowledged(bool $value): void
66+
{
67+
$this->noAcknowledged = $value;
68+
}
69+
70+
protected function getNoAcknowledged(): bool
71+
{
72+
return $this->noAcknowledged;
73+
}
74+
6375
/**
6476
* Listen to the given queue in a loop.
6577
*
@@ -100,7 +112,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
100112
$queue,
101113
$this->consumerTag,
102114
false,
103-
false,
115+
$this->getNoAcknowledged(),
104116
false,
105117
false,
106118
function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void {

0 commit comments

Comments
 (0)