Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/Clients/Consumer/KafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@

public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null)
{
$this->logger = $logger ?? new NullLogger();

Check warning on line 36 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "Coalesce": @@ @@ private bool $shouldRun = true; public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null) { - $this->logger = $logger ?? new NullLogger(); + $this->logger = new NullLogger() ?? $logger; $this->setupInternalTerminationSignal($config); $config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void { $this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]);

$this->setupInternalTerminationSignal($config);

Check warning on line 38 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null) { $this->logger = $logger ?? new NullLogger(); - $this->setupInternalTerminationSignal($config); + $config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void { $this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]); });

$config->getConf()->setErrorCb(

Check warning on line 40 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ { $this->logger = $logger ?? new NullLogger(); $this->setupInternalTerminationSignal($config); - $config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void { - $this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]); - }); + $rebalanceCallback = function (RdKafkaConsumer $kafka, int $err, array|null $partitions = null): void { /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) {
function (RdKafkaConsumer $kafka, int $err, string $reason): void {
$this->logger->error(
sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason),
Expand All @@ -51,9 +51,9 @@
/** @phpstan-var array<string, TopicPartition>|null $partitions */
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$this->logger->debug(

Check warning on line 54 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions)); + $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
'Assigning partitions',
$partitions === null ? [] : array_map(

Check warning on line 56 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "Ternary": @@ @@ /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions)); + $this->logger->debug('Assigning partitions', $partitions === null ? array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions) : []); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:

Check warning on line 56 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "UnwrapArrayMap": @@ @@ /** @phpstan-var array<string, TopicPartition>|null $partitions */ switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static fn(TopicPartition $partition): string => (string) $partition->getPartition(), $partitions)); + $this->logger->debug('Assigning partitions', $partitions === null ? [] : $partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
static fn (TopicPartition $partition): string => (string) $partition->getPartition(),
$partitions,
),
Expand All @@ -77,15 +77,15 @@
$kafka->assign();
}
};
$config->getConf()->setRebalanceCb($rebalanceCallback);

Check warning on line 80 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ $kafka->assign(); } }; - $config->getConf()->setRebalanceCb($rebalanceCallback); + parent::__construct($config->getConf()); } /**

parent::__construct($config->getConf());
}

/**
* @param callable(Message) : void $onSuccess
* @param callable() : void $onPartitionEof
* @param callable() : void $onTimedOut
* @param callable(Message):void $onSuccess
* @param (callable():void)|null $onPartitionEof
* @param (callable():void)|null $onTimedOut
*/
public function start(
int $timeoutMs,
Expand All @@ -97,8 +97,8 @@
}

/**
* @param callable(Message) : void $processRecord
* @param callable(ConsumerRecords) : void $onBatchProcessed
* @param (callable(Message):void)|null $processRecord
* @param (callable(ConsumerRecords):void)|null $onBatchProcessed
*/
public function startBatch(
int $maxBatchSize,
Expand All @@ -120,8 +120,8 @@
$consumerRecords,
): void {
$consumerRecords->add($message);
if ($processRecord !== null) {

Check warning on line 123 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "NotIdentical": @@ @@ $consumerRecords = new ConsumerRecords(); $this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void { $consumerRecords->add($message); - if ($processRecord !== null) { + if ($processRecord === null) { $processRecord($message); } if ($consumerRecords->count() === $maxBatchSize) {
$processRecord($message);

Check warning on line 124 in src/Clients/Consumer/KafkaConsumer.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "FunctionCallRemoval": @@ @@ $this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void { $consumerRecords->add($message); if ($processRecord !== null) { - $processRecord($message); + } if ($consumerRecords->count() === $maxBatchSize) { if ($onBatchProcessed !== null && !$consumerRecords->isEmpty()) {
}

if ($consumerRecords->count() === $maxBatchSize) {
Expand Down Expand Up @@ -150,9 +150,9 @@
}

/**
* @param callable(Message): void $onSuccess
* @param callable() : void $onPartitionEof
* @param callable() : void $onTimedOut
* @param callable(Message):void $onSuccess
* @param (callable():void)|null $onPartitionEof
* @param (callable():void)|null $onTimedOut
*/
private function doStart(
int $timeoutMs,
Expand Down
Loading