Skip to content

Commit

Permalink
Merge pull request #6 from simPod/psalm
Browse files Browse the repository at this point in the history
Introduce PSalm
  • Loading branch information
simPod committed May 19, 2020
2 parents 31a2b66 + 8c8b255 commit 92d5e0a
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 8 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/shepherd.yml
@@ -0,0 +1,36 @@
name: Run Shepherd

on:
pull_request:
push:
branches:
- "master"

env:
LIBRDKAFKA_VERSION: v1.4.2

jobs:
build:
runs-on: ubuntu-18.04

steps:
- uses: actions/checkout@v2

- name: Install librdkafka
run: |
chmod +x .ci/install_rdkafka.sh
.ci/install_rdkafka.sh
- name: Install PHP
uses: shivammathur/setup-php@2.2.0
with:
php-version: 7.3
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2

- name: Install dependencies
run: composer install --prefer-dist --no-progress --no-suggest

- name: Run Psalm
run: vendor/bin/psalm --threads=4 --output-format=github --shepherd
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -6,6 +6,7 @@
[![Packagist](https://poser.pugx.org/simpod/kafka/v/stable.svg)](https://packagist.org/packages/simpod/kafka)
[![Licence](https://poser.pugx.org/simpod/kafka/license.svg)](https://packagist.org/packages/simpod/kafka)
[![GitHub Issues](https://img.shields.io/github/issues/simPod/PhpKafka.svg?style=flat-square)](https://github.com/simPod/PhpKafka/issues)
[![Type Coverage](https://shepherd.dev/github/simPod/PhpKafka/coverage.svg)](https://shepherd.dev/github/simPod/PhpKafka)

## Installation

Expand Down
4 changes: 3 additions & 1 deletion composer.json
Expand Up @@ -27,7 +27,9 @@
"phpstan/phpstan-phpunit": "0.12.8",
"phpstan/phpstan-strict-rules": "0.12.2",
"phpunit/phpunit": "^9.1",
"simpod/php-coveralls-mirror": "^3.0"
"psalm/plugin-phpunit": "^0.10.0",
"simpod/php-coveralls-mirror": "^3.0",
"vimeo/psalm": "dev-master#47cf69d as 3.11.4"
},
"suggest": {
"kwn/php-rdkafka-stubs": "Support and autocompletion for RDKafka in IDE | require as dev dependency"
Expand Down
19 changes: 19 additions & 0 deletions psalm.xml
@@ -0,0 +1,19 @@
<?xml version="1.0"?>
<psalm
totallyTyped="true"
resolveFromConfigFile="true"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://getpsalm.org/schema/config"
xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
>
<projectFiles>
<directory name="src" />
<ignoreFiles>
<directory name="vendor" />
</ignoreFiles>
</projectFiles>

<plugins>
<pluginClass class="Psalm\PhpUnitPlugin\Plugin" />
</plugins>
</psalm>
15 changes: 8 additions & 7 deletions src/Clients/Consumer/KafkaConsumer.php
Expand Up @@ -36,16 +36,17 @@ public function __construct(ConsumerConfig $config, ?LoggerInterface $logger = n
$this->setupInternalTerminationSignal($config);

$config->getConf()->setErrorCb(
function ($kafka, $err, $reason) : void {
function (RdKafkaConsumer $kafka, int $err, string $reason) : void {
$this->logger->error(
sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason),
['err' => $err]
);
}
);

$config->getConf()->setRebalanceCb(
function (KafkaConsumer $kafka, int $err, ?array $partitions = null) : void {
$rebalanceCb =
/** @param array<string, TopicPartition>|null $partitions */
function (RdKafkaConsumer $kafka, int $err, ?array $partitions = null) : void {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$this->logger->info(
Expand All @@ -69,14 +70,14 @@ static function (TopicPartition $partition) : string {
$partitions
)
);
$kafka->assign(null);
$kafka->assign();
break;
default:
$this->logger->error(sprintf('Rebalancing failed: %s (%d)', rd_kafka_err2str($err), $err));
$kafka->assign(null);
$kafka->assign();
}
}
);
};
$config->getConf()->setRebalanceCb($rebalanceCb);

parent::__construct($config->getConf());
}
Expand Down
1 change: 1 addition & 0 deletions src/Clients/Producer/KafkaProducer.php
Expand Up @@ -19,6 +19,7 @@ public function __construct(ProducerConfig $config)
public function produce(ProducerRecord $record) : void
{
$topic = $this->newTopic($record->topic);
/** @psalm-suppress UndefinedMethod https://github.com/vimeo/psalm/issues/3406 */
$topic->produce($record->partition ?? RD_KAFKA_PARTITION_UA, self::RD_KAFKA_MSG_F_COPY, $record->value, $record->key);
$this->poll(0);
}
Expand Down

0 comments on commit 92d5e0a

Please sign in to comment.