Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/Clients/Consumer/KafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,22 @@
$consumerRecords,
): void {
$consumerRecords->add($message);
if ($processRecord !== null) {

Check warning on line 123 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "NotIdentical": @@ @@ $consumerRecords = new ConsumerRecords(); $this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void { $consumerRecords->add($message); - if ($processRecord !== null) { + if ($processRecord === null) { $processRecord($message); } if ($consumerRecords->count() === $maxBatchSize) {
$processRecord($message);

Check warning on line 124 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ $this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void { $consumerRecords->add($message); if ($processRecord !== null) { - $processRecord($message); + } if ($consumerRecords->count() === $maxBatchSize) { if ($onBatchProcessed !== null && !$consumerRecords->isEmpty()) {
}

if ($consumerRecords->count() === $maxBatchSize) {
if ($onBatchProcessed !== null && ! $consumerRecords->isEmpty()) {

Check warning on line 128 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "LogicalAnd": @@ @@ $processRecord($message); } if ($consumerRecords->count() === $maxBatchSize) { - if ($onBatchProcessed !== null && !$consumerRecords->isEmpty()) { + if ($onBatchProcessed !== null || !$consumerRecords->isEmpty()) { $onBatchProcessed($consumerRecords); } $consumerRecords->clear();
$onBatchProcessed($consumerRecords);
}

$consumerRecords->clear();

Check warning on line 132 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ if ($onBatchProcessed !== null && !$consumerRecords->isEmpty()) { $onBatchProcessed($consumerRecords); } - $consumerRecords->clear(); + $batchTime->reset($timeoutMs, new DateTimeImmutable()); return; }
$batchTime->reset($timeoutMs, new DateTimeImmutable());

Check warning on line 133 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ $onBatchProcessed($consumerRecords); } $consumerRecords->clear(); - $batchTime->reset($timeoutMs, new DateTimeImmutable()); + return; } $this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords)();

return;

Check warning on line 135 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "ReturnRemoval": @@ @@ } $consumerRecords->clear(); $batchTime->reset($timeoutMs, new DateTimeImmutable()); - return; + } $this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords)(); }, $this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords), $this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords));
}

$this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords)();

Check warning on line 138 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ $batchTime->reset($timeoutMs, new DateTimeImmutable()); return; } - $this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords)(); + }, $this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords), $this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords)); } public function shutdown(): void
},
$this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords),
$this->checkBatchTimedOut($timeoutMs, $batchTime, $onBatchProcessed, $consumerRecords),
Expand All @@ -144,8 +144,13 @@

public function shutdown(): void
{
$this->logger->debug('Shutting down');

Check warning on line 147 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ } public function shutdown(): void { - $this->logger->debug('Shutting down'); + $this->stop(); } public function stop(): void

$this->stop();
}

public function stop(): void

Check warning on line 152 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "PublicVisibility": @@ @@ $this->logger->debug('Shutting down'); $this->stop(); } - public function stop(): void + protected function stop(): void { $this->shouldRun = false; }
{
$this->shouldRun = false;
}

Expand All @@ -160,7 +165,9 @@
callable|null $onPartitionEof = null,
callable|null $onTimedOut = null,
): void {
$this->registerSignals($this->shouldRun);
$this->shouldRun = true;
$terminationCallback = fn () => $this->stop();
$this->registerSignals($terminationCallback);

while ($this->shouldRun) {
$message = $this->consume($timeoutMs);
Expand Down
13 changes: 8 additions & 5 deletions src/Clients/Consumer/WithSignalControl.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace SimPod\Kafka\Clients\Consumer;

use Safe\Exceptions\PcntlException;

use function Safe\pcntl_signal;
use function Safe\pcntl_sigprocmask;

Expand All @@ -22,12 +24,13 @@ private function setupInternalTerminationSignal(ConsumerConfig $config): void
$config->set('internal.termination.signal', SIGIO);
}

private function registerSignals(bool &$shouldRun): void
/**
* @param callable():mixed $terminationCallback
*
* @throws PcntlException
*/
private function registerSignals(callable $terminationCallback): void
{
$terminationCallback = static function () use (&$shouldRun): void {
$shouldRun = false;
};

pcntl_signal(SIGTERM, $terminationCallback);
pcntl_signal(SIGINT, $terminationCallback);
pcntl_signal(SIGHUP, $terminationCallback);
Expand Down
Loading